最近一些数据的存储使用了es,并且相对老的写入流程做了较大改动,暴露出来很多问题因此做了比较多的调整,这篇文章做一下简单记录
概览
es归根到底也是一个db,因此使用方式我们可以类比其他数据库,例如MySQL,因此在使用的时候也应该类似。
- 建立连接
- 通过连接写入数据
- 数据支持缓存批量写入
- 数据支持多线程写入
注意:本文档基于5.x版本
建立连接
首先当然是建立连接了,这里很简单,遵循官方文档就可以,主要是节点和认证
1 | EsClient, err = elastic.NewClient( |
数据写入
数据写入也分几种方式,首先是最基本最简单的一条一条插入这种方式,由于这种方式在数据量上来之后一定会出现性能瓶颈,因此不建议这样使用因此这里不做展开,我们在生产环境中时遇到类似问题的,特意在前人基础上改为了批量写入的逻辑
1 | BulkRequest := EsClient.Bulk() #生成一个请求连接 |
这里看一下Add的定义,数据append之后是被放在requests这个数组里面,我们知道golang的数组在扩充过程中是会涉及到内存重分配的,因此才需要把append之后的返回值重新赋值给数组,所以如果多个线程操作这个数据写入的逻辑,是存在冲突可能的,因此需要加锁
1 | func (s *BulkService) Add(requests ...BulkableRequest) *BulkService { |
坑
通过这种方式写入需要自己控制好缓冲区的大小,因为只有调用Do的时候才会将数组里面的数据实际发送给es集群,从上面add我们可以看到如果我们一直调用Add那么缓冲区会逐步增大,之后再掉用Do的话可能会由于数据量过大造成连接断开,因此需要增加长度控制和异常处理,根据当初的情况来看,如果过大存在中断后一直重试的问题。
修正
注意:这里的修正是临时方案不优雅,因为没有太多时间调研
1 | lock.Lock() |
这种操作方式虽然相对于上一种有了一些性能上的提升,但是依然存在一定的问题,首先是对于缓冲区的维护消耗了过多精力,另一方面这个是单线程的写入,当数据量达到一定程度之后依然会有严重的性能问题
#BulkProcessor多线程写入
当数据量达到一定程度之后,观察到es写入延时增大,比如00:00写入的数据,可能要00:15才能观察到,并且性能问题不解决,写入数据会逐渐堆积在缓冲区,服务异常的时候会全部丢弃,这也是不可接受的,因此经过一系列调研找到了es写入时的多线程解决方案
1 | w, _ := g.EsClient.BulkProcessor().BulkActions(100).After(GetFailed).FlushInterval(time.Second).Stats(true).Workers(8).Do(context.Background()) |
这里简单说一下bulkactions表示数据打到多少之后进行一次写入,flushInterval控制的则是刷新间隔,stats表示是否获取统计信息,worker表示的是worker数量,建议和cpu保持一致,还有很多其他参数可以控制写入条件,例如缓冲区的数据大小。
坑
这里其实还是有坑存在的,因为这里是有多个worker存在嘛,然后我们在写入数据的时候因为不会指定是哪个worker,如果你的数据是有先后顺序会覆盖写入,这里也是会有问题的,问题就是,两条数据A1 A2 ,后者更新,他们两个写入es正常情况是根据id来进行覆盖,A2会把A1覆盖掉,那么如果这个时候A2写入的worker2比A1写入的worker1先写入成功,那么后面A1写入之后就会导致状态更新异常,之后再取出来的数据就不是最新的了。
解决方案:可以在服务里面增加一层缓冲池,根据id来进行唯一性存储,然后定时从这里面取出来数据写入es,保证数据的唯一性
这里还要注意一点就是processor的Add默认情况下是同步阻塞的,因此在es集群出现性能问题的时候注意不要阻塞服务主流程
1 | p.requestsC = make(chan BulkableRequest)//这里可以看到 |
同步写入
以上所有的都是异步操作es,某些情况下还会存在同步写入的需求,例如写入后就会立刻请求数据需要拿到最新状态,那么这个是偶就需要用到同步写入了
1 | c := EsClient.Bulk() |
转载请注明来源链接 http://just4fun.im/2020/06/14/golang-write-elasticsearch/ 尊重知识,谢谢:)