目标是基于官方的Nacos Sync提供任务片和群集高可用性,以支持大规模注册群集迁移,并在节点宕机时允许其他节点快速响应和迁移故障。 技术要点如下,在正文中以源代码的一部分或伪代码表示。
*详细代码请参考以下内容。
3359 github.com/Zhang men-tech/nacos * *
服务一致性散列路由:
根据图1的多集群部署,设定各节点可配置的虚拟节点数,使其能够在Hash环上均匀分布。
//虚拟节点配置
sync.consistent.hash.replicas=1000;
//存储虚拟节点
SortedMap circle=new TreeMap (;
//在容器中添加所有节点,并构建散列环
replicas for loop {
//为每个物理节点设置虚拟节点stringnodestr=node.tostring (.concat (' # # '.concat ) integer.tostring ) )。 //根据算法计算出虚拟节点的Hash值intHashcode=gethash(nodestr )//将虚拟节点放入hash环中进行circle.put(hashcode,node );
}
//异步监听节点的生存状态
etcdmanager.watchetcdkeyasync (register _ worker _ path,true,response - {
watcheventevent : response.get events () ) /删除事件、 此节点和Hash中的虚拟节点if(event.geteventtype ) (.equals ) watchevent.eventtype.delete ) { string key=optional.of nulllablable } 获取Etcd中心跳丢失的节点string[]ks=key.split(slash ); log.info(lostheartbeat ),ks[3]; //本节点不判断的if (! IP utils.getip address ((.equals ignore case ) ks[3] ) /监听心率丧失,更明显的存储节点缓存,散列环上的节点nodecaches.remove try//心跳丢失,etcd上该节点的处理任务manager.deleteetcdvaluebykey (per _ worker _ process _ service.concat ) slash ).CCH catch(interruptedexceptione ) log.error('clear ) (processservicefailed,) },ks[3],e ); }catch(executionexceptione ) log.error('clear ) (processservicefailed,) },ks[3],e ); } }
}
根据业务服务名的FNV1_32_HASH算法计算各业务服务的散列值,顺时针计算该散列值最接近的节点,将任务代行到该节点。
//计算任务的散列值
inthash=gethash(key.tostring () );
if (! circle.containskey(hash ) }
sortedmaptailmap=circle.tail map (hash ); //要找到顺势疗法的最近节点hash=tailMap.isEmpty (? circle.first key (: tail map.first key );
}
得到Hash循环内的节点位置
circle.get(hash;
//判断任务是否为自己的处理节点
同步共享名称(if ) ) 0
//如果任务属于该节点,则心跳同步进程任务(task );
}
//删除心跳同步任务
taskstatusenum.delete.getcode (.equals ) taskupdaterequest.gettaskstatus ) ) }
通过Etcd生存节点一致性哈希算法,获取该任务所在的处理节点nodeprocessnode=syncshardingproxy.fetchprocessnode (task ); 进程节点. ismys
elf()) { // 如果是自己的同步任务,发布删除心跳事件 eventBus.post(new DeleteTaskEvent(taskDO)); } else { // 如果是其他节点,则通过Http代理到此节点处理 httpClientProxy.deleteTask(targetUrl,task); }}
同步节点宕机故障转移:
节点监听。监听其它节点存活状态,配置 Etcd 集群租约 TTL , TTL 内至少发送 5 个续约心跳以保证一旦出现网络波动避免造成节点丢失。
// 心跳TTL配置
sync.etcd.register.ttl = 30;
// 获取租约TTL配置
String ttls = environment.getProperty(ETCD_BEAT_TTL);
long ttl = NumberUtils.toLong(ttls);
// 获取租约ID
long leaseId = client.getLeaseClient().grant(ttl).get().getID();
PutOption option = PutOption.newBuilder().withLeaseId(leaseId).withPrevKV().build();
client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8), option).get();
long delay = ttl / 6;
// 定时续约
scheduledExecutorService.schedule(new BeatTask(leaseId, delay), delay, TimeUnit.SECONDS);
// 续约任务
private class BeatTask implements Runnable {
long leaseId; long delay; public BeatTask(long leaseId, long delay) { this.leaseId = leaseId; this.delay = delay; } public void run() { client.getLeaseClient().keepAliveOnce(leaseId); scheduledExecutorService.schedule(new BeatTask(this.leaseId, this.delay), delay, TimeUnit.SECONDS); }
}
节点宕机。其中某个节点宕机,其任务转移到其它节点,因为有虚拟节点的缘故,所以此节点的任务会均衡 ReSharding 到其它节点,那么,集群在任何时候,任务处理都是分片均衡的,如图2中, B 节点宕机, ##1 、 ##2 虚拟节点的任务会分别转移到 C 和 A 节点,这样避免一个节点承担宕机节点的所有任务造成剩余节点连续雪崩。
节点恢复。如图3,节点的虚拟节点重新添加到 Hash 环中, Sharding 规则变更,恢复的节点会根据新的 Hash 环规则承担其它节点的一部分任务。心跳任务一旦在节点产生都不会自动消失,这时需要清理其它节点的多余任务(即重新分配给复苏节点的任务),给其它节点减负(这一步非常关键,不然也可能会引发集群的连续雪崩),保障集群恢复到最初正常任务同步状态。
// 找到此节点处理的心跳同步任务
Map finishedTaskMap = skyWalkerCacheServices.getFinishedTaskMap();
// 存储非此节点处理任务
Map unBelongTaskMap = Maps.newHashMap();
// 找到集群复苏后,Rehash后不是此节点处理的任务
if (!shardingEtcdProxy.isProcessNode(taskDO.getServiceName()) && TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) {
unBelongTaskMap.put(operationId, entry.getValue());
}
unBelongTaskMap for loop {
// 删除多余的节点同步 specialSyncEventBus.unsubscribe(taskDO); // 删除多余的节点处理任务数 proxy.deleteEtcdValueByKey(PER_WORKER_PROCESS_SERVICE.concat(SLASH).concat(IPUtils.getIpAddress()).concat(SLASH).concat(taskDO.getServiceName()), false); // 根据不同的同步类型,删除多余的节点心跳 if (ClusterTypeEnum.EUREKA.getCode().equalsIgnoreCase(clusterDO.getClusterType())) { syncToNacosService.deleteHeartBeat(taskDO); } if (ClusterTypeEnum.NACOS.getCode().equalsIgnoreCase(clusterDO.getClusterType())) { syncToEurekaService.deleteHeartBeat(taskDO); } // 删除多余的finish任务 finishedTaskMap.remove(val.getKey());
}
节点容灾。如果 Etcd 集群连接不上,则存活节点从配置文件中获取,集群正常运作,但是会失去容灾能力。
// 配置所有处理节点的机器IP,用于构建Hash环
sync.worker.address = ip1, ip2, ip3;
// 从配置文件获取所有处理任务节点IP
List ips = getWorkerIps();
ConsistentHash consistentHash = new ConsistentHash(replicas, ips);
// 如果从Etcd中获取不到当前处理节点,则构建Hash环用配置文件中的IP列表,且列表不会动态变化
if (CollectionUtils.isNotEmpty(nodeCaches)) {
consistentHash = new ConsistentHash(replicas, nodeCaches);
}
return consistentHash;
Nacos Eureka Sync 保障手段
Nacos Eureka Sync 同步界面
从如下界面可以保证,从 Eureka 或者 Nacos 上,新上线或者下线一个业务服务(非实例),都能让 Nacos Eureka Sync 实时感知。但我们做了更进一层的智能化和自动化:
1、新增同步。结合 DevOps 发布平台,当一个业务服务(非实例)新上线的时候,智能判断它是从哪个注册中心上线的,然后回调 Nacos Eureka Sync 接口,自动添加同步接口,例如,A 业务服务注册到 Eureka 上,DevOps 发布平台会自动添加它的 Eureka -> Nacos 的同步任务,反之亦然。当然从如下界面的操作也可实现该功能。
2、删除同步。由于 DevOps 发布平台无法判断一个业务服务(非实例)下线,或者已经迁移到另一个注册中心,已经全部完毕(有同学会反问,可以判断的,即查看那个业务服务的实例数是否是零为标准,但我们应该考虑,实例数为零在网络故障的时候也会发生,即心跳全部丢失,所以这个判断依据是不严谨的),交由业务人员来判断,同时配合钉钉机器人告警提醒,由基础架构部同学从如下界面的操作实现该功能。