Newer
Older
#!/usr/bin/env python3
"""Queries alertmanager for alerts, outputs json.
Can be also used as cli application.
Usage:
./query_alertmananger.py -h
"""
import json
def get_alerts(url: str, password: str, user: str = "admin") -> dict:
"""Query alerts.
https://koumoul.com/openapi-viewer/?url=https://raw.githubusercontent.com/prometheus/alertmanager/master/api/v2/openapi.yaml&proxy=false
"""
api_url = f"{url}/api/v2/alerts"
query = {"silenced": "false"}
count = 0
alerts = []
state = "unknown"
req = requests.get(api_url, auth=(user, password), params=query, timeout=10)
alerts_json = req.json()
for alert in alerts_json:
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(alert)
name = alert["labels"]["alertname"]
severity = alert["labels"].get("severity", "unknown")
instance = alert["labels"].get("instance", "")
alert_state = alert["status"]["state"]
release_name = alert["labels"].get("release_name", "")
if severity != "none" and alert_state != "suppressed":
count += 1
alert_text = f"{name}: {instance}, {release_name} ({severity})"
alerts.append(alert_text)
state = "critical"
elif count > 0:
state = "warning"
elif count == 0:
state = "ok"
summary = f"{count} alerts"
requests.exceptions.Timeout,
) as err:
print("Exception catched: ", err)
summary = "Cant connect"
state = "warning"
return {"summary": summary, "alerts": alerts, "state": state, "count": count}
@plac.pos("url", "URL of alertmanager instance", type=str)
@plac.pos("password", "Password", type=str)
@plac.opt("port", "Port of alertmanager instance", type=int, abbrev="P")
@plac.opt("user", "User name", type=str)
def main(url: str, password: str, port: int = 443, user: str = "admin") -> None:
"""Run main function."""
alerts = get_alerts(url=url, password=password, port=port, user=user)
print(json.dumps(alerts))