Prometheus 报警实现

原文转载自 「利强的博客」 ( https://liqiang.io/post/830914e0 ) By Liqiang Lau

预计阅读时间 0 分钟(共 0 个字, 0 张图片, 0 个链接)

概述

本来不想看这段代码的,不过最近想解决一个问题,就是 Prometheus 重启之后,报警被会被自动解决,然后再等待一段时间之后,再触发起来。一开始想着自己改代码解决的,思路都想好了,但是,在想的过程中觉得这是个 common case,可以考虑一下社区是否接纳,所以就先看了一些官方的 Issue:

很明显,这个问题已经被解决了,还是吃了没有跟随社区版本的亏(希望这个坑可以填上),所以,这里顺带看看社区怎么做的(最后发现和自己的想法基本一致)。

加载报警规则

group copyState

group run 做什么

defer close(g.terminated)
... ...
for {
... ...
                missed := (time.Since(lastTriggered).Nanoseconds() / g.interval.Nanoseconds()) - 1
                if missed > 0 {
                    iterationsMissed.Add(float64(missed))
                    iterationsScheduled.Add(float64(missed))
                }
                lastTriggered = time.Now()
                iter()
... ...
}

其中 iter 就是执行报警规则:

    iter := func() {
        iterationsScheduled.Inc()

        start := time.Now()
        g.Eval(ctx, start)

        iterationDuration.Observe(time.Since(start).Seconds())
        g.SetEvaluationTime(time.Since(start))
    }

group stop 做什么

就做了这些微小的贡献:

func (g *Group) stop() {
    close(g.done)
    <-g.terminated
}

重载报警规则

同加载报警规则

执行报警规则

执行过程

发送报警

代码在:cmd/prometheus/main.go

func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
    return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
        var res []*notifier.Alert

        for _, alert := range alerts {
            if alert.State == rules.StatePending {
                continue
            }
            a := &notifier.Alert{
                StartsAt:     alert.FiredAt,
                Labels:       alert.Labels,
                Annotations:  alert.Annotations,
                GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
            }
            if !alert.ResolvedAt.IsZero() {
                a.EndsAt = alert.ResolvedAt
            }
            res = append(res, a)
        }

        if len(alerts) > 0 {
            n.Send(res...)
        }
        return nil
    }
}

再看看真正的发送逻辑:

// notifier/notifier.go
    for _, a := range alerts {
        lb := labels.NewBuilder(a.Labels)

        for ln, lv := range n.opts.ExternalLabels {
            if a.Labels.Get(string(ln)) == "" {
                lb.Set(string(ln), string(lv))
            }
        }

        a.Labels = lb.Labels()
    }

    alerts = n.relabelAlerts(alerts)

    if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
        n.queue = n.queue[d:]    // 这里是不是有 bug?

        level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
        n.metrics.dropped.Add(float64(d))
    }
    n.queue = append(n.queue, alerts...)

    // Notify sending goroutine that there are alerts to be processed.
    n.setMore()  // ---->  接着就是各种通知方式了

报警解决

清除解决的报警:

// rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
... ...
    for fp, a := range r.active {
        if _, ok := resultFPs[fp]; !ok {
            // If the alert was previously firing, keep it around for a given
            // retention time so it is reported as resolved to the AlertManager.
            if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
                delete(r.active, fp)
            }
            if a.State != StateInactive {
                a.State = StateInactive
                a.ResolvedAt = ts
            }
            continue
        }

添加 EndedAt 字段

// cmd/prometheus/main.go
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
    return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
... ...
            if !alert.ResolvedAt.IsZero() {
                a.EndsAt = alert.ResolvedAt
            }

重启后延续报警

保存报警状态

// rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
... ...
        if r.restored {
            vec = append(vec, r.sample(a, ts))
            vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
        }
... ...

然后被存储进 TSDB

// rules/manager.go
            for _, s := range vector {
                if _, err := app.Add(s.Metric, s.T, s.V); err != nil {

加载报警状态

// rules/manager.go
func (g *Group) run(ctx context.Context) {
... ...
    if g.shouldRestore {
        case <-g.done:
            return
        case <-tick.C:
            missed := (time.Since(evalTimestamp) / g.interval) - 1
            if missed > 0 {
                g.metrics.iterationsMissed.Add(float64(missed))
                g.metrics.iterationsScheduled.Add(float64(missed))
            }
            evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
            iter()
        }

        g.RestoreForState(time.Now())
        g.shouldRestore = false
    }

这里有个有意思的点就在于,在重加载报警之前,会先执行一遍报警(relabel)规则(iter()),原因在注释里面也写了:

这背后的原因是,在第一次执行期间(或之前),我们可能没有采集到足够的数据,relabel 规则也不会更新到最新的值,而这些值可能被一些警报依赖。

然后再来看下具体的加载过程,首先一开始就是先定义要回溯的时间,这个就是我们在 prometheus 的启动参数 --rules.alert.for-outage-tolerance 中指定的值:

    maxtMS := int64(model.TimeFromUnixNano(ts.UnixNano()))
    // We allow restoration only if alerts were active before after certain time.
    mint := ts.Add(-g.opts.OutageTolerance)
    mintMS := int64(model.TimeFromUnixNano(mint.UnixNano()))
    q, err := g.opts.Queryable.Querier(g.opts.Context, mintMS, maxtMS)

然后就是针对每个报警规则(这里是以报警规则为组织单位),然后一一确认报警规则对应的报警的历史状态:

for _, rule := range g.Rules() {
        alertHoldDuration := alertRule.HoldDuration()
        if alertHoldDuration < g.opts.ForGracePeriod {
            alertRule.SetRestored(true)
            continue
        }

这里体现的就是 prometheus 启动参数中的 --rules.alert.for-grace-period 这个参数,如果报警规则的 for 小于这个时间的话,那么就会忽略该报警规则的重载,直接跳过;

        alertRule.ForEachActiveAlert(func(a *Alert) {

这里有意思的是,组织的形式居然是以现在内存中的 active 的报警为组织单位,这也是为什么重载报警规则之前也先执行一次的原因,这样,那些带 for 的报警就会被放入 active 这个内存中,状态虽然是 pending,但是不要担心,很快他们就会变成 firing 了。

这里通过 Alert 构建 label 系列,然后查询对应的持久化报警:

            smpl := alertRule.forStateSample(a, time.Now(), 0)
            ... ...
            sset := q.Select(false, nil, matchers...)
            ... ...
            var t int64
            var v float64
            it := s.Iterator()
            for it.Next() {
                t, v = it.At()
            }

然后就将报警解析出来,然后就计算此时的报警应该是什么状态(Pending/Firing):

            downAt := time.Unix(t/1000, 0).UTC()
            restoredActiveAt := time.Unix(int64(v), 0).UTC()
            timeSpentPending := downAt.Sub(restoredActiveAt)
            timeRemainingPending := alertHoldDuration - timeSpentPending

            if timeRemainingPending <= 0 {
                // 这里就可以直接触发报警了,那么报警的触发时间就是重启之前的触发时间了
            } else if timeRemainingPending < g.opts.ForGracePeriod {
                // 这里的逻辑比较奇怪,但是代码里面给了一个运算过程,结论就是这个
                // 这里我的理解是新的 restoredActiveAt 就是新的 Alert 会成为 pending 状态的时间,如果再加上 alertHoldDuration 就等于 g.opts.ForGracePeriod 了
                // 也就是说在启动 g.opts.ForGracePeriod 时间之后刚好处于 firing
                restoredActiveAt = ts.Add(g.opts.ForGracePeriod).Add(-alertHoldDuration)
            } else {
                // 这里其实就是不认为重启期间的时间属于 for 的访问
                // 例如 for 是 5m,重启前已经 for 了 2m,重启花了 2m,那么现在重启之后,还是认为 for 了 2m,而不是 2+2 = 4m
                downDuration := ts.Sub(downAt)
                restoredActiveAt = restoredActiveAt.Add(downDuration)
            }
            a.ActiveAt = restoredActiveAt

从代码中可以看出,如果是重新加载之后,并不会马上就通知一次 Alert manager,还是需要再等待一个周期,也就是说在重启之后的第二个周期的那次才会再触发一次。

more_vert