golang写入elasticsearch总结

最近一些数据的存储使用了es,并且相对老的写入流程做了较大改动,暴露出来很多问题因此做了比较多的调整,这篇文章做一下简单记录

概览

es归根到底也是一个db,因此使用方式我们可以类比其他数据库,例如MySQL,因此在使用的时候也应该类似。

  1. 建立连接
  2. 通过连接写入数据
  3. 数据支持缓存批量写入
  4. 数据支持多线程写入

注意:本文档基于5.x版本

建立连接

首先当然是建立连接了,这里很简单,遵循官方文档就可以,主要是节点和认证

1
2
3
4
5
EsClient, err = elastic.NewClient(
elastic.SetURL(nodes...),
elastic.SetBasicAuth(username, password),
elastic.SetMaxRetries(max)
)

数据写入

数据写入也分几种方式,首先是最基本最简单的一条一条插入这种方式,由于这种方式在数据量上来之后一定会出现性能瓶颈,因此不建议这样使用因此这里不做展开,我们在生产环境中时遇到类似问题的,特意在前人基础上改为了批量写入的逻辑

1
2
3
BulkRequest := EsClient.Bulk() #生成一个请求连接
BulkRequest.Add(request) //这里我们把doc加入到缓冲区里面,这里注意这个Add是将数据append到了bulkrequest里面的一个队列里面,因此如果这里涉及到多线程的操作记得加锁,不然存在两个线程同时操作的时候内存地址变化导致异常
BulkRequest.Do(context.TODO())

这里看一下Add的定义,数据append之后是被放在requests这个数组里面,我们知道golang的数组在扩充过程中是会涉及到内存重分配的,因此才需要把append之后的返回值重新赋值给数组,所以如果多个线程操作这个数据写入的逻辑,是存在冲突可能的,因此需要加锁

1
2
3
4
5
6
func (s *BulkService) Add(requests ...BulkableRequest) *BulkService {  
for _, r := range requests {
s.requests = append(s.requests, r)
}
return s
}

通过这种方式写入需要自己控制好缓冲区的大小,因为只有调用Do的时候才会将数组里面的数据实际发送给es集群,从上面add我们可以看到如果我们一直调用Add那么缓冲区会逐步增大,之后再掉用Do的话可能会由于数据量过大造成连接断开,因此需要增加长度控制和异常处理,根据当初的情况来看,如果过大存在中断后一直重试的问题。

修正

注意:这里的修正是临时方案不优雅,因为没有太多时间调研

1
2
3
4
5
6
7
8
9
lock.Lock()
n := BulkRequest.NumberOfActions()
if n > 200{
if _, err := BulkRequest.Do(context.TODO());err != nil{
...
}
}
BulkRequest.Add(request)
lock.Unlock()

这种操作方式虽然相对于上一种有了一些性能上的提升,但是依然存在一定的问题,首先是对于缓冲区的维护消耗了过多精力,另一方面这个是单线程的写入,当数据量达到一定程度之后依然会有严重的性能问题

#BulkProcessor多线程写入

当数据量达到一定程度之后,观察到es写入延时增大,比如00:00写入的数据,可能要00:15才能观察到,并且性能问题不解决,写入数据会逐渐堆积在缓冲区,服务异常的时候会全部丢弃,这也是不可接受的,因此经过一系列调研找到了es写入时的多线程解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
w, _ := g.EsClient.BulkProcessor().BulkActions(100).After(GetFailed).FlushInterval(time.Second).Stats(true).Workers(8).Do(context.Background())
w.Start(context.Background())
w.Add(request)//这里不用显示调用Do 因为上面定义了两个写入的触发条件
//GetFailed是发生数据写入失败时获取详情的回调函数
func GetFailed(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if response == nil {//可能存在为空的情况 😳
log.Println("GetNil response return")
return
}
fi := response.Failed()
if len(fi) != 0 {
for _, f := range fi {
log.Printf("DebugFailedEs: index:%s type:%s id:%s version:%d status:%d result:%s ForceRefresh:%v found:%v errorDetail:%v getResult:%v\n", f.Index, f.Type, f.Id, f.Version, f.Status, f.Result, f.ForcedRefresh, f.Found, f.Error, f.GetResult)
}
}
}
/*
...
*/
st := w.Stats() //获取数据写入情况
log.Printf("BulkMonitor state Succeeded:%d Failed:%d Created:%d Updated:%d Deleted:%d Flushed:%d Committed:%d Indexed:%d\n", st.Succeeded, st.Failed, st.Created, st.Updated, st.Deleted, st.Flushed, st.Committed, st.Indexed)

/*
...
*/
for _, s := range st.Workers {
log.Println("BulkMonitor Queue:", s.Queued) //这里打印每个worker的队列里面有多少数据

这里简单说一下bulkactions表示数据打到多少之后进行一次写入,flushInterval控制的则是刷新间隔,stats表示是否获取统计信息,worker表示的是worker数量,建议和cpu保持一致,还有很多其他参数可以控制写入条件,例如缓冲区的数据大小。

这里其实还是有坑存在的,因为这里是有多个worker存在嘛,然后我们在写入数据的时候因为不会指定是哪个worker,如果你的数据是有先后顺序会覆盖写入,这里也是会有问题的,问题就是,两条数据A1 A2 ,后者更新,他们两个写入es正常情况是根据id来进行覆盖,A2会把A1覆盖掉,那么如果这个时候A2写入的worker2比A1写入的worker1先写入成功,那么后面A1写入之后就会导致状态更新异常,之后再取出来的数据就不是最新的了。

解决方案:可以在服务里面增加一层缓冲池,根据id来进行唯一性存储,然后定时从这里面取出来数据写入es,保证数据的唯一性

这里还要注意一点就是processor的Add默认情况下是同步阻塞的,因此在es集群出现性能问题的时候注意不要阻塞服务主流程

1
2
3
4
5
6
7
p.requestsC = make(chan BulkableRequest)//这里可以看到
/*
....
*/
func (p *BulkProcessor) Add(request BulkableRequest) {
p.requestsC <- request
}

同步写入

以上所有的都是异步操作es,某些情况下还会存在同步写入的需求,例如写入后就会立刻请求数据需要拿到最新状态,那么这个是偶就需要用到同步写入了

1
2
3
4
5
6
7
c := EsClient.Bulk()
c = c.Add(doc)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
_, err := c.Refresh("wait_for").Do(ctx)
if err != nil{
...
}

转载请注明来源链接 http://just4fun.im/2020/06/14/golang-write-elasticsearch/ 尊重知识,谢谢:)