关于删除,ES提供的是delete by id的思路。 早期ES是支持通过query批量删除的,后来觉得这个功能太过危险,就将delete by query做成了Plugin, 由用户自行安装插件后才能使用。 关于ES批量删除的思路,可以参考delete by query插件的源码,大体思路是通过scroll query 按条件查询出doc id, 然后使用client.bulk()进行删除
TransportFlushAction 索引flush,数据从filesystem cache刷新到磁盘,核心逻辑在SyncedFlushService
当向elasticsearch发送创建document索引请求的时候,document数据会先进入到index buffer之后,与此同时会将操作记录在translog之中,当发生refresh时(数据从index buffer中进入filesystem cache的过程)translog中的操作记录并不会被清除,而是当数据从filesystem cache中被写入磁盘之后才会将translog中清空。而从filesystem cache写入磁盘的过程就是flush
总结一下translog的功能:
1.保证在filesystem cache中的数据不会因为elasticsearch重启或是发生意外故障的时候丢失。
2.当系统重启时会从translog中恢复之前记录的操作。
3.当对elasticsearch进行CRUD操作的时候,会先到translog之中进行查找,因为tranlog之中保存的是最新的数据。
4.translog的清除时间时进行flush操作之后(将数据从filesystem cache刷入disk之中)。
再总结一下flush操作的时间点:
1.es的各个shard会每个30分钟进行一次flush操作。
2.当translog的数据达到某个上限的时候会进行一次flush操作。
有关于translog和flush的一些配置项:
index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush。默认是 unlimited。
index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作。默认是512mb。
index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作。默认是30m。
index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作。es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s。
下面开始看下SyncedFlushService的源码
构造函数会加载相关的service,并且注册requestHandler,包括preSyncedFlushAction, syncedFlushAction和inFlightOpsAction
下面看核心逻辑,只在主分片上操作flush,调用函数为attemptSyncedFlush, 对结果的回调函数为onResponse和onFailure
刷新索引会继续调用到核心函数innerAttemptSyncedFlush
1.首先根据shardId获取分片路由信息shardRoutingTable,校验activeShards必须存在
2.方法入口为innerAttemptSyncedFlush
给所有的分片pre-sync flush 请求,也是通过底层netty 的Transport实现
3.循环入参shards列表,对于每次循环的shard,首先获取分片所在的node,node如果为空,就继续下一循环
4.根据分片获取的node不为空,就调用transportService.sendRequest来发送请求,并且handleResponse和handleException处理回调结果
索引刷新的处理: 会调用到performPreSyncedFlush
5.如下执行pre sync flush
6. 获取request对应的分片shard,底层还是调用lucene 的engine.flush进行索引flush处理
7.判断此时engine如果处于没有从translog中恢复,那么flush就不允许
7.1 加读锁readLock,然后ensureOpen,确保当前引擎engine是打开的
7.2 判断如果当前没有获取到flushLock锁,那么就block,等着获取flushLock锁
7.3 如果当前indexWriter有没提交的change或者是force强制flush
7.3.1 先把translog roll到下一个generation,创建一个新的translog,更新检查点checkpoint
7.3.2 调用commitIndexWriter,把索引数据刷新到磁盘,finally块中解锁flushLock
flush后删除translog