prometheus代码学习记录-scrape篇

监控的未来

在上家公司的时候调研过监控,最开始后的时候用的cacti,后来用的zabbix再后来公司想着自研一套,于是我就去调研了,当时看到了falcon,说实话看完文档和使用方式,只能说很没有工程美感,简单看了下就跳过了,于是就自己写了一个采集的agent,在调研到存储和告警的时候,豁然发现了prometheus(2017.10),因为当时也在想监控系统应该是做成什么样子,当时想来想去都觉得,一定不是zabbix那样的,一个监控系统的采集、告警和存储应该解耦,并且通过适当的分层,最终到达一种db的模式,就是你想接入监控?ok那对我来说,你就是个简单的key、value其实,我想要加告警策略那么就类似写一条sql就行了,而prometheus就是这样的!真是工程美的典范,至于当前这家公司,17年入职以后用的是falcon,而且还各种自己魔改,把本来就不怎么样的代码魔改的像一坨emmmm,不吐槽了,浪费时间。关于监控的一部分思考可以看下这个链接.

监控的未来,一定是prometheus这样,接入方的数据就是简单的key-value,介入后配置规则进行告警

从接口开始

在我看来prometheus的核心一方面是他的数据存储设计,但是更核心的应该是他的PromQL这两个都很难,后者甚至语法解析词法解析这些东西(还觉得编译原理没用吗?),所以这里我还是从数据入口来进行学习,就是prom和agent的scrape接口。

接口数据的格式不是用的json,这里我怀疑是因为json的可读性不是太好,所以默认是用的自有格式,不过现在官方也提供了prom2json这样的工具,下载node_exporter启动之后访问目标机器的9100端口/metrics就可以拉取到监控数据了,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 8.4896e-05
go_gc_duration_seconds{quantile="0.25"} 0.000109896
go_gc_duration_seconds{quantile="0.5"} 0.000120105
go_gc_duration_seconds{quantile="0.75"} 0.000130313
go_gc_duration_seconds{quantile="1"} 0.000664058
go_gc_duration_seconds_sum 0.023961168
go_gc_duration_seconds_count 181
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 10
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.9.2"} 1
.....
.....
.....

#之后的算是监控项的定义和备注,HELP是对监控项的备注,TYPE是对指标的定义,之后下面的数据就是具体的监控指标后面中括号当中的是给监控项细分的label(这也是prometheus设计的非常好的一点,拓展性极强,数据格式非常清晰,后面讲到规则配置的时候还会继续细说),最后就是当前的数值。所以总结一下,数据行其实就是两部分,第一部分是监控指标也就是metric+labels(可选)第二部分是具体的数据。使用官方工具转换成json就是如下格式了,说实话感觉json可读性并不是很好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[
{
"name": "http_request_duration_microseconds",
"help": "The HTTP request latencies in microseconds.",
"type": "SUMMARY",
"metrics": [
{
"labels": {
"method": "get",
"handler": "prometheus",
"code": "200"
},
"quantiles": {
"0.99": "67542.292",
"0.9": "23902.678",
"0.5": "6865.718"
},
"count": "743",
"sum": "6936936.447000001"
},
{
"labels": {
"method": "get",
"handler": "prometheus",
"code": "400"
},
"quantiles": {
"0.99": "3542.9",
"0.9": "1202.3",
"0.5": "1002.8"
},
"count": "4",
"sum": "345.01"
}
]
},
{
"name": "roshi_select_call_count",
"help": "How many select calls have been made.",
"type": "COUNTER",
"metrics": [
{
"value": "1063110"
}
]
}
]

之后来到prometheus/scrape里面主要包含三个主要文件manager.go/scrape.go/target.go从名字可以大致看到他们的主要功能:

manager主要是拉取目标的一些管理上的功能模块

主要包含一个Manager的结构体定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Manager struct {
logger log.Logger
append storage.Appendable
graceShut chan struct{}

jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup.
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group

triggerReload chan struct{}
}
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error
func (m *Manager) reloader()
func (m *Manager) reload()
func (m *Manager) setJitterSeed(labels labels.Labels) error
func (m *Manager) Stop()
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group)
func (m *Manager) ApplyConfig(cfg *config.Config) error
func (m *Manager) TargetsAll() map[string][]*Target
func (m *Manager) TargetsActive() map[string][]*Target
func (m *Manager) TargetsDropped() map[string][]*Target

target主要包含对于目标机器相关的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Target struct {
// Labels before any processing.
discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics.
labels labels.Labels
// Additional URL parameters that are part of the target URL.
params url.Values

mtx sync.RWMutex
lastError error
lastScrape time.Time
lastScrapeDuration time.Duration
health TargetHealth
metadata MetricMetadataStore
}
func (t *Target) String() string
func (t *Target) MetadataList() []MetricMetadata
func (t *Target) MetadataSize() int
func (t *Target) MetadataLength() int
func (t *Target) Metadata(metric string) (MetricMetadata, bool)
func (t *Target) SetMetadataStore(s MetricMetadataStore)
func (t *Target) hash() uint64
func (t *Target) offset(interval time.Duration, jitterSeed uint64) time.Duration
func (t *Target) Labels() labels.Labels
func (t *Target) DiscoveredLabels() labels.Labels
func (t *Target) SetDiscoveredLabels(l labels.Labels)
func (t *Target) URL() *url.URL
func (t *Target) Report(start time.Time, dur time.Duration, err error)
func (t *Target) LastError() error
func (t *Target) LastScrape() time.Time
func (t *Target) LastScrapeDuration() time.Duration
func (t *Target) Health() TargetHealth

scrape应该是最细粒度具体干活的逻辑代码主要包含一些一些结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//类似一个拉取池
type scrapePool struct {
appendable storage.Appendable
logger log.Logger

mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// set of hashes.
activeTargets map[uint64]*Target
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc

// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
}//方法主要是一些获取拉取目标和reload 作用不大这里不再赘述
//创建对象时的配置选项结构体
type scrapeLoopOptions struct {
target *Target
scraper scraper
limit int
honorLabels bool
honorTimestamps bool
mrc []*relabel.Config
cache *scrapeCache
}

type targetScraper struct {
*Target

client *http.Client
req *http.Request
timeout time.Duration

gzipr *gzip.Reader
buf *bufio.Reader
}//简单粗暴就一个scrape的函数定义
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)

type scrapeLoop struct {
scraper scraper
l log.Logger
cache *scrapeCache
lastScrapeSize int
buffers *pool.Pool
jitterSeed uint64
honorTimestamps bool

appender func() storage.Appender
sampleMutator labelsMutator
reportSampleMutator labelsMutator

parentCtx context.Context
ctx context.Context
cancel func()
stopped chan struct{}

disabledEndOfRunStalenessMarkers bool
}
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error)
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration)
func (sl *scrapeLoop) stop()
func (sl *scrapeLoop) disableEndOfRunStalenessMarkers()
func (sl *scrapeLoop) getCache() *scrapeCache
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error)
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error)
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error)
func (sl *scrapeLoop) reportStale(start time.Time) (err error)
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error

// metaEntry holds meta information about a metric.
type metaEntry struct {
lastIter uint64 // Last scrape iteration the entry was observed at.
typ textparse.MetricType
help string
unit string
}

他们的调用关系简单说一下就是manager创建scrapePool拉取池创建之后会创建一个scrapeLoop来循环拉取目标,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (m *Manager) reload() {
for setName, groups := range m.targetSets {
if _, ok := m.scrapePools[setName]; !ok {
.....
//这里可以看到根据目标Set创建了一些拉取池
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
.....
}

func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
//这里我们可以看到在创建scrapePool的时候会给newLoop函数指针赋值
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
cache := opts.cache
if cache == nil {
cache = newScrapeCache()
}
opts.target.SetMetadataStore(cache)

return newScrapeLoop(
ctx,
opts.scraper,
log.With(logger, "target", opts.target),
buffers,
func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func() storage.Appender { return appender(app.Appender(), opts.limit) },
cache,
jitterSeed,
opts.honorTimestamps,
)
}
}

func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
//依次为所有目标创建一个loop来循环拉取
for fp, oldLoop := range sp.loops {
....
newLoop = sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
cache: cache,
})
...
}

在scrape.go里面可以看到targetScraper的scrape方法,这里就是对配置当中每个目标拉取数据的逻辑(相当于一个HTTP GET请求)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {
//这里是拼接目标的url,然后通过get方式拉取数据
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
}
req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))

s.req = req
}

resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status)
}

if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body)
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}
//是否开启数据压缩
if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}

_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

scrapeLoop它run起来之后就开始进入一个loop,定时从target当中拉取数据并进行解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
select {
//这里应该是有一个启动准备时间防止一些结构和数据在首次启动或者重载的时候没有刷新
case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
}

var last time.Time

ticker := time.NewTicker(interval)
defer ticker.Stop()

mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
default:
}

var (
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)

// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}

b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
//这里进行数据的拉取contentType是获取到的接口数据
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel()

if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}

// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
//这里进行监控数据的解析,里面会相对复杂一些,因为涉及到了对于文本的解析,并且实现的也很巧妙,使用递归的方式对获取到的文本信息进行解析
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
if appErr != nil {
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}

sl.buffers.Put(b)

if scrapeErr == nil {
scrapeErr = appErr
}

if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
}
last = start

select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
case <-ticker.C:
}
}

close(sl.stopped)

if !sl.disabledEndOfRunStalenessMarkers {
sl.endOfRunStaleness(last, ticker, interval)
}
}

这里我们可以看到在函数开始声明并创建了一个app、和p,这两个是这个部分主要干活的两个对象,appender我们可以看到是scrapeLoop创建时赋值给appender的一个函数方法,函数定义是func()storage.Appender,返回一个Appender对象,向上查找可以找到在manager当中创建拉取池的时候赋值指定的

1
newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error)

在对返回内容进行解析的时候,会调用textparse.New生成一个如下的promlexer

说句题外话,看到代码里面有队openmetrics进行的支持和适配,看起来后续是打算支持新的格式,可以看下官方文档项目官网githubprometheus rouadMap简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type promlexer struct {
b []byte
i int
start int
err error
state int
}
// PromParser parses samples from a byte slice of samples in the official
// Prometheus text exposition format.
type PromParser struct {
l *promlexer
series []byte
text []byte
mtype MetricType
val float64
ts int64
hasTS bool
start int
offsets []int
}
//在调用textparse.New的时候会将scrape拿到的内容追加一个换行符赋值给变量b
&PromParser{l: &promlexer{b: append(b, '\n')}} //这一里可以看到数值型变量开始都是默认值0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var (
//这里通过查找最终在storage当中找到了相关定义,在main中调用,之后在newManager一路透传过来
app = sl.appender()
//textparse定义在pkg/textparse/promparse.go当中,New之后返回的是PromParser
p = textparse.New(b, contentType)
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{}
sampleLimitErr error
)

defer func() {
if err != nil {
app.Rollback()
return
}
if err = app.Commit(); err != nil {
return
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
}()

loop:
for {
var (
et textparse.Entry
sampleAdded bool
)
if et, err = p.Next(); err != nil {
if err == io.EOF {
err = nil
}
break
}
switch et {
case textparse.EntryType:
sl.cache.setType(p.Type())
continue
case textparse.EntryHelp:
sl.cache.setHelp(p.Help())
continue
case textparse.EntryUnit:
sl.cache.setUnit(p.Unit())
continue
case textparse.EntryComment:
continue
default:
}
total++

t := defTime
//这里拿到数据序列的byte类型
//Series returns the bytes of the series, the timestamp if set, and the value of the current sample.
// met是metric类型是[]byte tp是时间戳*int64 v是具体数值float类型
met, tp, v := p.Series()
if !sl.honorTimestamps {
tp = nil
}
if tp != nil {
t = *tp
}

if sl.cache.getDropped(yoloString(met)) {
continue
}
ce, ok := sl.cache.get(yoloString(met))

if ok {
//在storage/fanout.go里面定义
err = app.AddFast(ce.ref, t, v)
_, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
// In theory this should never happen.
if err == storage.ErrNotFound {
ok = false
}
}
if !ok {
var lset labels.Labels

mets := p.Metric(&lset)
hash := lset.Hash()

// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)

// The label set may be set to nil to indicate dropping.
if lset == nil {
sl.cache.addDropped(mets)
continue
}

if !lset.Has(labels.MetricName) {
err = errNameLabelMandatory
break loop
}

var ref uint64
ref, err = app.Add(lset, t, v)
sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, &appErrs)
if err != nil {
if err != storage.ErrNotFound {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
}
break loop
}

if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset)
}
sl.cache.addRef(mets, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil {
seriesAdded++
}
}

// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabelling.
added++

}
if sampleLimitErr != nil {
if err == nil {
err = sampleLimitErr
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeSampleLimit.Inc()
}
if appErrs.numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
}
if appErrs.numDuplicates > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
}
if appErrs.numOutOfBounds > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
}
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
}
return
}

距离上次看已经过蛮久了,代码里面甚至都已经把scrapeLoop的report函数更改为了scrapeAndReport,由于从接口拿到的数据格式在解析过程中使用了编译原理当中的词法分析,这块先跳过,先看服务的整体流程,后面回过头再重新查看。

今天看到一篇介绍谷歌Monarch的文章,很硬核,看起来不错的,推荐,看样子是解决了分布式的问题,值得注意的是还使用了push的方式

存储

从上一段scrape代码里面可以看到在append里面,是存在将数据add到存储里面的这个步骤,因此从这里入手,可以看到当数据流入服务的存储核心代码的时候都经过了哪些处理,从sl.cache.get我们可以看出到,根据从服务cache中是否获取到,分别走了两个流程,命中之后的逻辑比较简单,未命中的逻辑稍微复杂一些。

可以看到AddFast的调用,通过向上一路追查

1
2
3
4
5
6
7
8
9
10
11
12
13
func (f *fanoutAppender) AddFast(ref uint64, t int64, v float64) error {
if err := f.primary.AddFast(ref, t, v); err != nil {
return err
}

for _, appender := range f.secondaries {
if err := appender.AddFast(ref, t, v); err != nil {
return err
}
}
return nil
}
//primary和secondaries都是appender的数组
1
2
3
4
5
6
7
8
9
10
11
12
func (f *fanout) Appender(ctx context.Context) Appender {
primary := f.primary.Appender(ctx)
secondaries := make([]Appender, 0, len(f.secondaries))
for _, storage := range f.secondaries {
secondaries = append(secondaries, storage.Appender(ctx))
}
return &fanoutAppender{
logger: f.logger,
primary: primary,
secondaries: secondaries,
}
}

向上一直找到fanout的定义和创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type fanout struct {
logger log.Logger

primary Storage
secondaries []Storage
}
//可以看到在这里创建时允许指定多个存储源
func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
return &fanout{
logger: logger,
primary: primary,
secondaries: secondaries,
}
}

以上代码来自storage/fanout.go,以下对于NewFanout的调用来自main.go,prometheus是支持写入本地的同时将数据写入远程存储的,比如influxdb,所以可以看到这里创建了两个实例,分别对应了本地和远程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    var (
localStorage = &readyStorage{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)
//本地存储的定义
type readyStorage struct {
mtx sync.RWMutex
db *tsdb.DB
startTimeMargin int64
}
//以及对于未准备就绪存储的定义(用以解决存储为准备就绪的问题)可以借鉴一下
type notReadyAppender struct{}

func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
return 0, tsdb.ErrNotReady
}

func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return tsdb.ErrNotReady }

func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }

func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }

/*
...
*/
//创建真实的db是通过调用tsdb.Open实现的
//这里可以看到将真实的db赋值给localstorage
localStorage.Set(db, startTimeMargin)

以下内容来自tsdb/db.go,这里需要说一下tsdb的早期设计和实现是有fabxc实现的,可以看一下他的blog,原理出自该论文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
var rngs []int64
opts, rngs = validateOpts(opts, nil)
return open(dir, l, r, opts, rngs)
}
//从函数定义可以大致看到这里返回了DB实例
func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64) (_ *DB, returnedErr error) {
//首先创建数据目录
//修正index数据
//migrate wal数据
//移除垃圾和临时块文件
}

type DB struct {
dir string
lockf fileutil.Releaser

logger log.Logger
metrics *dbMetrics
opts *Options
chunkPool chunkenc.Pool
compactor Compactor
blocksToDelete BlocksToDeleteFunc

// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []*Block

head *Head

compactc chan struct{}
donec chan struct{}
stopc chan struct{}

// cmtx ensures that compactions and deletions don't run simultaneously.
cmtx sync.Mutex

// autoCompactMtx ensures that no compaction gets triggered while
// changing the autoCompact var.
autoCompactMtx sync.Mutex
autoCompact bool

// Cancel a running compaction when a shutdown is initiated.
compactCancel context.CancelFunc
}

在tsdb/head.go中我们可以看到AddFast的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc()
return storage.ErrOutOfBounds
}

s := a.head.series.getByID(ref)
if s == nil {
return errors.Wrap(storage.ErrNotFound, "unknown series")
}
s.Lock()
if err := s.appendable(t, v); err != nil {
s.Unlock()
if err == storage.ErrOutOfOrderSample {
a.head.metrics.outOfOrderSamples.Inc()
}
return err
}
s.pendingCommit = true
s.Unlock()

if t < a.mint {
a.mint = t
}
if t > a.maxt {
a.maxt = t
}

a.samples = append(a.samples, record.RefSample{
Ref: ref,
T: t,
V: v,
})
a.sampleSeries = append(a.sampleSeries, s)
return nil
}

转载请注明来源链接 http://just4fun.im/2020/06/27/prometheus-interface/ 尊重知识,谢谢:)