Skip to content
Snippets Groups Projects
query_alertmanager.py 1.83 KiB
Newer Older
  • Learn to ignore specific revisions
  • Varac's avatar
    Varac committed
    #!/usr/bin/env python3
    """Queries alertmanager for alerts, outputs json.
    
    Can be also used as cli application.
    Usage:
    
      ./query_alertmananger.py -h
    """
    
    import json
    import plac
    import requests
    from alertmanager import AlertManager
    
    
    def get_alerts(url, password, port=443, user='admin'):
        req = requests.Session()
        req.auth = (user, password)
        AM = AlertManager(host=url, port=port, req_obj=req)
    
        count = 0
        alerts = []
        state = "unknown"
    
        try:
            alerts_raw = AM.get_alerts()
            for alert in alerts_raw:
                name = alert['labels']['alertname']
                severity = alert['labels']['severity']
                instance = alert['labels']['instance']
                alert_state = alert['status']['state']
                release_name = alert['labels']['release_name']
    
                if severity != "none" and alert_state != "suppressed":
                    count += 1
                    alert_text = f'{name}: {instance}, {release_name} ({severity})'
                    alerts.append(alert_text)
    
            if count > 2:
                state = "critical"
            elif count > 0:
                state = "warning"
            elif count == 0:
                state = "ok"
    
            summary = f'{count} alerts'
        except:
            summary = "Cant connect"
            state = "warning"
    
        return_data = {
            'summary': summary,
            'alerts': alerts,
            'state': state,
            'count': count}
        return return_data
    
    
    
    Varac's avatar
    Varac committed
    @plac.pos('url', "URL of alertmanager instance", type=str)
    @plac.pos('password', "Password", type=str)
    
    Varac's avatar
    Varac committed
    @plac.opt('port', "Port of alertmanager instance", type=int, abbrev='P')
    @plac.opt('user', "User name", type=str)
    def main(url, password, port=443, user='admin'):
        """Main function."""
        alerts = get_alerts(url=url, password=password, port=port, user=user)
        print(json.dumps(alerts))
    
    
    if __name__ == '__main__':
        plac.call(main)