会话连接
Zookeeper客户端和服务端保持长连接,每10s向服务端发送一次心跳,服务端返回客户端响应。 这就是会话连接,它拥有全局唯一的会话id。 通常,Session连接始终有效,如果由于网络而断开连接,客户端将使用相同的session id重新连接。 由于服务器端保留了session的各种状态,特别是各种瞬时节点是否被删除取决于session是否被禁用。
会话失效问题
客户端主动关闭连接通常被视为禁用会话。 此外,还可能存在其他未知原因,如网络超时导致的session禁用问题。 从服务器的角度看,无法区分session禁用的情况,一旦出现session禁用,一定时间后session拥有的所有watcher和瞬时节点将被删除。
另一方面,对于Zookeeper客户端,发生seesion吊销时,Zookeeper客户端认为客户端已死亡,因为一旦吊销,它就不知道是否应该重新连接watcher和瞬时节点问题所有的操作都将无法进行。 请参见howshouldihandlesession _ expired?
解决方案
对于只有简单查询服务的客户端,禁用session后,只需重新建立连接即可。 对于需要处理瞬时节点和各种watcher的服务,APP应用程序必须处理禁用和重新连接session带来的副作用。
以下逻辑可以轻松解决session重新连接问题。 这意味着重新生成新连接。
public static org.Apache.zookeeper.zookeepergetzookeeper ()
if (zookeeper==空) {
同步(zookeeper client.class )。
if (zookeeper==空) {
latch=newcountdownlatch(1;
zookeeper=buildClient (; //如果失败,则long start time=system.current time millis (;
try{
latch.await(30,TimeUnit.SECONDS );
}catch(interruptedexceptione ) {
e .打印任务跟踪(;
}finally{
finalsocketaddressremoteaddres=zookeeper.testableremotesocketaddress (;
system.out.println ([ suc-core ] localhost : ' zookeeper.testablelocalsocketaddress () );
system.out.println ([ suc-core ] remote host : ' remote addres );
system.out.println ([ suc-core ] zookeeper session id : ) zookeeper.getsessionid ();
finalstringremotehost=remote addres!=null? (InetSocketAddress ) remoteAddres ).getAddress ) ) : ';
system.out.println ([ suc-core ] init cost : ) (system.currenttimemillis )-start time ((ms ) ) )远程主机
latch=null;
}
}
}
}
返回zookeeper;
}
上面的代码只是使用Double-Check来维护Zookeeper客户端的单个实例。 确保始终有机会重新生成客户端,并且只有单个实例。 这是因为Zookeeper客户端是线程安全的。
异常的上例继承了org.apache.zookeeper.ZooKeeper类,从而使session能够获得正确的服务器端ip地址和客户端地址,从而便于调试问题。
生成客户端时,必须设置会话超时。 例如,以下代码为30秒。
privatestaticzookeeperbuildclient (
finalstringrootpath=get root path (;
finalstringconnectstring=system config.getinstance ().getstring ) ' zookeeper.IPS ',/(192.168.10.1:2188
:2181,192.168.10.3:2181,192.168.10.4:2181,192.168.10.5:2181");System.out.printf("[SUC-CORE] rootPath: %1sn", rootPath);
System.out.printf("[SUC-CORE] connectString:%1sn", connectString);
try {
return new ZooKeeper(connectString + rootPath, 30000, new SessionWatcher());
} catch (IOException e) {
throw new RuntimeException("init zookeeper fail.", e);
}
}
为了处理连接建立成功以及断开问题,我们需要一个Watcher来处理此问题。
static class SessionWatcherimplements Watcher {
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
if (latch != null) {
latch.countDown();
}
} else if (event.getState() == KeeperState.Expired) {
System.out.println("[SUC-CORE] session expired. now rebuilding
");
//session expired, may be never happending.//close old client and rebuild new client close();
getZooKeeper();
}
}
}
一旦检测到session失效了(Expired),那么久销毁已经建立的客户端实例,重新生成一个客户端实例。
/*** 关闭zookeeper连接,释放资源*/
public static void close() {
System.out.println("[SUC-CORE] close");
if (zookeeper != null) {
try {
zookeeper.close();
zookeeper = null;
} catch (InterruptedException e) {
//ignore exception }
}
}
这是一种简单的处理Session expired问题的方法,显然这不会处理瞬时节点的问题,因此如果有相关的需求,业务系统(应用程序)需要自己修复问题。
测试方案
public static void main(String[] args) throws Exception {
ZooKeeper zk = ZookeeperClient.getZooKeeper();
long sessionId = zk.getSessionId();
// final String rootPath = ZookeeperClient.getRootPath();
final String connectString = SystemConfig.getInstance().getString("zookeeper.ips",// "192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181,192.168.10.4:2181,192.168.10.5:2181");
//close the old connection new ZooKeeper(connectString + rootPath, 30000, null, sessionId, null).close();
Thread.sleep(10000L);
rebuild a new session long newSessionid = ZookeeperClient.getZooKeeper().getSessionId();
//check the new session String status = newSessionid != sessionId ? "OK" : "FAIL";
System.out.println(format("%s --> %s %s", sessionId, newSessionid, status));
//close the client ZookeeperClient.getZooKeeper().close();
}
最后能够自动处理session expired的问题。实验中看到确实执行了session expired的逻辑重新生成了新的session。