首页 > 编程知识 正文

elasticsearch源码分析,elasticsearch删除索引命令

时间:2023-05-06 06:53:58 阅读:264475 作者:4106

TransportDeleteAction 执行索引删除操作 从ES中删除一个文档的过程如下: 先从主分片中执行删除操作,再在所有的副本分片上执行删除操作。 所以核心流程分三步:s1: 从请求节点路由到主分片节点。s2: 在主分片节点执行删除操作。s3: 在所有的副本分片上执行删除操作。

   关于删除,ES提供的是delete by id的思路。 早期ES是支持通过query批量删除的,后来觉得这个功能太过危险,就将delete by query做成了Plugin, 由用户自行安装插件后才能使用。 关于ES批量删除的思路,可以参考delete by query插件的源码,大体思路是通过scroll query 按条件查询出doc id, 然后使用client.bulk()进行删除

 

TransportFlushAction 索引flush,数据从filesystem cache刷新到磁盘,核心逻辑在SyncedFlushService

  当向elasticsearch发送创建document索引请求的时候,document数据会先进入到index buffer之后,与此同时会将操作记录在translog之中,当发生refresh时(数据从index buffer中进入filesystem cache的过程)translog中的操作记录并不会被清除,而是当数据从filesystem cache中被写入磁盘之后才会将translog中清空。而从filesystem cache写入磁盘的过程就是flush

 

总结一下translog的功能:

1.保证在filesystem cache中的数据不会因为elasticsearch重启或是发生意外故障的时候丢失。

2.当系统重启时会从translog中恢复之前记录的操作。

3.当对elasticsearch进行CRUD操作的时候,会先到translog之中进行查找,因为tranlog之中保存的是最新的数据。

4.translog的清除时间时进行flush操作之后(将数据从filesystem cache刷入disk之中)。

 

再总结一下flush操作的时间点:

1.es的各个shard会每个30分钟进行一次flush操作。

2.当translog的数据达到某个上限的时候会进行一次flush操作。

 

有关于translog和flush的一些配置项:

index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush。默认是 unlimited。

index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作。默认是512mb。

index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作。默认是30m。

index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作。es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s。

 

下面开始看下SyncedFlushService的源码

构造函数会加载相关的service,并且注册requestHandler,包括preSyncedFlushAction, syncedFlushAction和inFlightOpsAction

下面看核心逻辑,只在主分片上操作flush,调用函数为attemptSyncedFlush, 对结果的回调函数为onResponse和onFailure

刷新索引会继续调用到核心函数innerAttemptSyncedFlush

1.首先根据shardId获取分片路由信息shardRoutingTable,校验activeShards必须存在

2.方法入口为innerAttemptSyncedFlush

给所有的分片pre-sync flush 请求,也是通过底层netty 的Transport实现

 

3.循环入参shards列表,对于每次循环的shard,首先获取分片所在的node,node如果为空,就继续下一循环

4.根据分片获取的node不为空,就调用transportService.sendRequest来发送请求,并且handleResponse和handleException处理回调结果

 

索引刷新的处理:  会调用到performPreSyncedFlush

5.如下执行pre sync flush

6.  获取request对应的分片shard,底层还是调用lucene 的engine.flush进行索引flush处理

  

7.判断此时engine如果处于没有从translog中恢复,那么flush就不允许

   

7.1 加读锁readLock,然后ensureOpen,确保当前引擎engine是打开的

7.2 判断如果当前没有获取到flushLock锁,那么就block,等着获取flushLock锁

7.3 如果当前indexWriter有没提交的change或者是force强制flush

      7.3.1 先把translog roll到下一个generation,创建一个新的translog,更新检查点checkpoint

      7.3.2 调用commitIndexWriter,把索引数据刷新到磁盘,finally块中解锁flushLock

flush后删除translog

 

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。