falcon性能优化之judge篇

监控在各个互联网公司的运维体系中都占据着即为重要的一环,不管是在日常的预警、亦或是事故中的定位、还是最终的复盘都贯彻始终,在互联网尚不发达的时代就存在多种监控系统,例如cacti、zabbix等,但随着服务扩展服务器数量成倍增加之后,上一代监控系统就无法满足使用需求了,因为他们几乎全都是单机存储以及存在无法横向扩容的问题。

cacti使用的是snmp协议通过执行PHP代码定期从各个机器采集数据并最终存储在本机的rrd文件中。

zabbix是c编写的二进制文件,从每台机器主动上报到master,最终由master存储到MySQL中。。

在各公司都对监控告警有越来越多的需求的情况下,国内外逐渐涌现出来了更加现代和实用的监控系统,国内比较出名的应该是由小米开源的Open-Falcon,通过拆分组件、增加数据分发服务等一些手段从一定程度上解决了业界的一些常用问题,如果上一代cacti和zabbix成为监控系统1.0的话,那么falcon可以称之为监控系统1.5。后面有机会详细介绍一下我认为的一个优秀监控系统所具备的特性以及为什么falcon不能称之为2.0的理由。这篇文章主要内容主要是介绍一下在使用过程中当监控数量过多时遇到的一些明显的性能问题。

性能问题

在监控数据增大的过程中falcon的一些关键组件:judge(主要用于判定告警)、hbs(主要用于同步给judge高兴规则)、graph(主要用来缓存、存储监控数据)会出现明显的性能问题,本文主要解决judge出现的问题以及改造方案,之后第二篇会来讲一下hbs的问题和改造方案。

以我当前这家公司为例,judge服务器实例数量在40+,单机内存为24G内存占用也达到了20G,这种情况下随时都有oom的风险,如果只是单纯的机器扩容当然可以解决问题,但是机器花费会大幅提升,同时治标不治本,通过阅读judge代码从中还是找到了很多可以优化的点,下面先说方案和关键部分代码,最后会放上优化前后的对比图。

falcon源码逻辑

在官方的代码仓库中,judge每次会从hbs同步全量的监控策略,但是实际上judge是一个集群,在transfer进行hash的过程中其实平均下来只有N分之一的数据量会打到judge并缓存下来,并且这里要注意对于没有配置了告警规则的监控数据,其实是没有必要在judge中存下来的。所以由此我们找到了两个可以优化的点:

  1. 从hbs同步数据时仅同步必要监控策略
  2. transfer上报过来的监控数据仅缓存配置了监控策略的数据

优化逻辑以及方案

以上两个点其实是相互依赖的,第一点以来第二点的上报数据,第二点依赖第一点同步的监控策略。

因此需要增加一个数据结构用以缓存每一个上报到该judge实例的数据:1.是不是有监控策略 2.上报到这个实例的监控数据都有哪些

相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
const (
DF_FILTER = iota - 1 //表示某个
DF_INUSE
)

type DataFilter map[string]int

type StrategyFilter struct {
sync.RWMutex
Data DataFilter `json:"data"`
}

func (this *StrategyFilter) Set(key string, value int) error {
this.Lock()
defer this.Unlock()
this.Data[key] = value
return nil
}

func (this *StrategyFilter) Get(key string) (int, bool) {
this.RLock()
defer this.RUnlock()
v, f := this.Data[key]
return v, f
}

func (this *StrategyFilter) LoadOrStore(key string, value int) (int, bool) {
this.Lock()
defer this.Unlock()
i, b := this.Data[key]
if !b {
this.Data[key] = value
i = value
}
return i, b
}

func (this *StrategyFilter) Len() int {
this.RLock()
defer this.RUnlock()
l := len(this.Data)
return l
}

func (this *StrategyFilter) Mset(keys []string, value int) {
this.Lock()
defer this.Unlock()
for _, k := range keys {
this.Data[k] = value
}
}

func (this *StrategyFilter) Copy() model.DataFilter {
this.RLock()
defer this.RUnlock()
m := make(model.DataFilter, len(this.Data))
for k, v := range this.Data {
m[k] = v
}
return m
}

func (this *StrategyFilter) PopAll() (model.DataFilter, error) {
this.Lock()
defer this.Unlock()
d := this.Data
this.Data = model.DataFilter{}
return d, nil
}

在judge当中是以host+metric来检索监控数据的,因此增加了DataFilter结构用来缓存当前上报过来的数据是INUSE还是应该被FILTER掉。

接下来就需要修改transfer请求judge的rpc接口,在收到transfer传来的数据的时候需要增加以下处理:

  1. 该条数据是否在使用当中,如果在使用当中那么需要将数据存储下来
  2. 如果该条数据没有告警规则与之对应,那么我同样需要将这条数据缓存下来,因为最终需要在hbs当中double check一下,否则会存在新增了hbs规则之后不生效的问题

以下为修改后的接口代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
remain := g.Config().Remain
now := time.Now().Unix()
em_lost := make(model.DataFilter)
filter_map := make(map[int]struct{}, len(items))

for index, _ := range items {
key := fmt.Sprintf("%s/%s", items[index].Endpoint, items[index].Metric)

if checkExpression(items[index]) {
filter_map[index] = struct{}{}
}
//这里检测上报过来的key是否存在,如果不存在就先默认设置为丢弃FILTER
if v, flag := g.DataFilter.LoadOrStore(key, g.DF_FILTER); flag {
if v == g.DF_INUSE {
filter_map[index] = struct{}{}
}
} else {
em_lost[key] = index
}
}
//这里发现上报过来的数据有没有告警规则的
if len(em_lost) != 0 {
//这时候拿这些数据去hbs增量同步一下告警规则
cron.SyncFilteredStrategiesIncre(em_lost)
for em, index := range em_lost {
//再次检测同步完成后是否存在告警规则了(其实hbs那里也存在很大的生效延迟)
if v, flag := g.DataFilter.Get(em); flag {
if v == g.DF_INUSE {
filter_map[index] = struct{}{}
}
}
}
}
//这里只将必要的监控数据进行缓存
for index, _ := range filter_map {
pk := items[index].PrimaryKey()
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, items[index], remain, now)
}
return nil
}

在原本的代码版本中,hbs与judge通信的接口是全量获取数据,judge会把hbs的全量数据拖过来,从list构建一个host+metric到告警规则的map出来,当数据量还小的时候区别不大,但是当数据量增加到万级的时候则显得又笨又重了。

下面看一下judge到hbs这一部分同步数据的代码变更,同步分两种,一个是增量同步,当有新的数据过来的时候需要拿着少量数据去进行同步。另外一个是全量同步,主要是为了解决当有规则删除时候的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func SyncFilteredStrategiesIncre(endpoint_metric model.DataFilter) {
if_merge := true
syncFilteredStrategies(endpoint_metric, if_merge)
}

func syncFilteredStrategies(endpoint_metric model.DataFilter, if_merge bool) {
//增量同步超时时间5s就行
timeout_i := 5 * 60
if if_merge {
timeout_i = 5
}
var strategiesBuildResponse model.StrategiesBuildResponse
err := g.HbsClient.Call("HbsNew.GetFilteredStrategies", endpoint_metric, &strategiesBuildResponse, timeout_i)
if err != nil {
log.Println("[ERROR] HbsNew.GetStrategies:", err)
return
}

//这里当删除规则的时候并不会将datafilter对应的标志v配置为-1,考虑到不会有频繁删除规则的情况存在暂>时不予处理
if !if_merge {
g.StrategyMap.ReInit(strategiesBuildResponse.HostStrategies)
keys := g.StrategyMap.Keys()
g.DataFilter.Mset(keys, g.DF_INUSE)

//更新全量更新的时间,防止由于网络问题导致更新失败
CronUpdateTimeStamp = time.Now().Unix()

} else if len(strategiesBuildResponse.HostStrategies) != 0 {
g.StrategyMap.Merge(strategiesBuildResponse.HostStrategies)
var keys_inuse []string
for key, _ := range strategiesBuildResponse.HostStrategies {
keys_inuse = append(keys_inuse, key)
}
g.DataFilter.Mset(keys_inuse, g.DF_INUSE)
}
}

这里注意一下,原版代码的strategymap里面没有merge的方法

1
2
3
4
5
6
7
8
9
func (this *SafeStrategyMap) Merge(strategies_map map[string][]model.Strategy) error {
this.Lock()
defer this.Unlock()

for key, strategies := range strategies_map {
this.M[key] = strategies
}
return nil
}

到这里judge部分的代码改造就基本完成了,但是这其实并不是最完美的优化方案,因为其实在当前这个judge实例上其实还是存在一小部分监控策略永远也用不到的。因为我们是按照host+metric来组成key的,并没有将tag考虑进去,所以还是存在一些无用策略的。另外就是对于Filter的缓存结构来说并没有定期清理的概念,所以此处也是存在一些垃圾数据缓存的。以上的问题是可以解决的但是投入产出比并不够大因此并没有进行相应的处理,以下是优化前后的内存变化图(需要翻墙):

judge-memory

转载请注明来源链接 http://just4fun.im/2020/02/29/falcon-optimize-judge/ 尊重知识,谢谢:)