上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码
/** 这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器 * */ public class ClientCnxn { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class); private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; /** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为 */ private static boolean disableAutoWatchReset; static { disableAutoWatchReset = Boolean.getBoolean("zookeeper.disableAutoWatchReset"); if (LOG.isDebugEnabled()) { LOG.debug("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset); } } static class AuthData { AuthData(String scheme, byte data[]) { this.scheme = scheme; this.data = data; } String scheme; byte data[]; } private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>(); /** *哪些已经发送出去的目前正在等待响应的包 */ private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); /** * 那些需要发送的包 */ private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>(); // 超时时间 private int connectTimeout; /** *客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求 */ private volatile int negotiatedSessionTimeout; // 读取超时时间 private int readTimeout; // 会话超时时间 private final int sessionTimeout; // ZooKeeper private final ZooKeeper zooKeeper; //客户端监视器管理器 private final ClientWatchManager watcher; //会话ID private long sessionId; //会话密钥 private byte sessionPasswd[] = new byte[16]; // 是否只读 private boolean readOnly; final String chrootPath; // 发送线程 final SendThread sendThread; // 事件回调线程 final EventThread eventThread; /** * Set to true when close is called. Latches the connection such that we * don't attempt to re-connect to the server if in the middle of closing the * connection (client sends session disconnect to server as part of close * operation) */ private volatile boolean closing = false; /** 一组客户端可以连接的Zk主机 */ private final HostProvider hostProvider; /** * 第一次和读写服务器建立连接时设置为true,之后不再改变。 这个值用来处理客户端没有sessionId连接只读模式服务器的场景. 客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以 当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话 如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的 */ volatile boolean seenRwServerBefore = false; public ZooKeeperSaslClient zooKeeperSaslClient; public long getSessionId() { return sessionId; } public byte[] getSessionPasswd() { return sessionPasswd; } public int getSessionTimeout() { return negotiatedSessionTimeout; } @Override public String toString() { StringBuilder sb = new StringBuilder(); SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress(); SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress(); sb .append("sessionid:0x").append(Long.toHexString(getSessionId())) .append(" local:").append(local) .append(" remoteserver:").append(remote) .append(" lastZxid:").append(lastZxid) .append(" xid:").append(xid) .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount()) .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount()) .append(" queuedpkts:").append(outgoingQueue.size()) .append(" pendingresp:").append(pendingQueue.size()) .append(" queuedevents:").append(eventThread.waitingEvents.size()); return sb.toString(); } /** * 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用 * 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器 */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16], canBeReadOnly); } public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { //客户端实例 this.zooKeeper = zooKeeper; // 客户端Watcher管理器 this.watcher = watcher; //sessionId this.sessionId = sessionId; //会话密钥 this.sessionPasswd = sessionPasswd; //会话超时时间 this.sessionTimeout = sessionTimeout; //服务器地址列表管理器 this.hostProvider = hostProvider; //根路径 this.chrootPath = chrootPath; // 连接超时时间是会话超时时间和服务器数量的比值 connectTimeout = sessionTimeout / hostProvider.size(); // 读超时时间是会话超时时间的2/3 readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; //创建发送和事件处理线程 sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); } public static boolean getDisableAutoResetWatch() { return disableAutoWatchReset; } public static void setDisableAutoResetWatch(boolean b) { disableAutoWatchReset = b; } //启动发送和事件处理线程 public void start() { sendThread.start(); eventThread.start(); }
// 事件处理线程 class EventThread extends Thread { //等待处理的事件 private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); /** *这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。 **/ private volatile KeeperState sessionState = KeeperState.Disconnected; private volatile boolean wasKilled = false; private volatile boolean isRunning = false; EventThread() { // 构造一个线程名 super(makeThreadName("-EventThread")); setUncaughtExceptionHandler(uncaughtExceptionHandler); // 设置为守护线程 setDaemon(true); } // public void queueEvent(WatchedEvent event) { // 如果WatchedEvent的类型是None状态是sessionStat的值则不处理 if (event.getType() == EventType.None && sessionState == event.getState()) { return; } // 获取事件的状态 sessionState = event.getState(); // 构建一个基于事件的监视器 WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // 排队pair,稍后处理 waitingEvents.add(pair); } // 排队Packet public void queuePacket(Packet packet) { if (wasKilled) { synchronized (waitingEvents) { if (isRunning) waitingEvents.add(packet); else processEvent(packet); } } else { waitingEvents.add(packet); } } public void queueEventOfDeath() { waitingEvents.add(eventOfDeath); } @Override public void run() { try { isRunning = true; while (true) { //从等待处理的事件队列中获取事件 Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); } // 真正处理事件的入口,主要是回调处理 private void processEvent(Object event) { try { // 如果事件是WatcherSetEventPair if (event instanceof WatcherSetEventPair) { //每个监视器都会处理这个事件 WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { Packet p = (Packet) event; int rc = 0; // 获取客户端路径 String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { LOG.warn("Somehow a null cb got to EventThread!"); } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { // 获取回调对象 StatCallback cb = (StatCallback) p.cb; // 如果处理成功回调方法会传入响应状态,否则响应状态为null if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } } }
相关推荐
Zookeeper源码分析.epub
zookeeper源码分析(一)工作原理概述 zookeeper源码分析(二)FastLeader选举算法 Zookeeper源码分析之Paxos算法之旅
第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...
ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...
Zookeeper3.5.1(源码解析)
本文详细分析了Zookeeper的源码,特别是Leader选举过程的实现。首先,介绍了阅读源码的意义,包括技术提升、框架掌握、问题定位、面试准备、深入理解技术以及参与开源社区。接着,提供了一系列高效阅读源码的方法,...
ZooKeeper源码分析 优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个...
zookeeper第四节课原理源码分析资料1
//注册监听//修改节点的值触发监ZooKeeper API 的初始化过程转载请注明《咕泡学院》,建议自己分析一遍在创建一个 ZooKeeper 客户端对象实例
zookeeper源码执行流程。 Zookeeper是一个开源的分布式应用程序协调服务器,其为分布式系统提供一致性服务。其一致性是通过基于Paxos算法的ZAB协议完成的。其主要功能包括:配置维护、域名服务、分布式同步、集群...
课程概要:启动流程源码分析快照与事物日志的存储结构一、启动流程知识点:工程结构介绍启动流程宏观图集群启动详细流程netty 服务工作机制1.工程结构介绍项目地址
zookeeper-3.4.14...............................................................
在第一节课,我们讲到了 zookeeper 的来源,是来自于 google chubby。为了解决在分布式环境下,如何从多个 server 中选举出 maste
非常强的一套Zookeeper集群实战,包含了全套的学习代码,学习笔记还有...内容从Zookeeper入门教学,本地安装,Zookeeper集群实战,项目需求,Zookeeper企业面试实战,Zookeeper算法实战,Zookeeper核心源码分析等内容。
本套课程中,第一阶段深入Zookeeper原理和源码,分析Zookeeper的核心实现,并通过多个应用场景说明,体现出其重要性和广泛的应用性。第二阶段深入Dubbo RPC、SPI等核心功能点,列举多个示例加以应用说明,期望能够...
此外,文章还提供了Zookeeper源码中关于ZAB协议实现的细节分析,为理解分布式系统的底层逻辑提供了宝贵的视角。本文适合对分布式系统感兴趣的开发者和研究者,帮助他们深入理解Zookeeper的核心机制
由于zookeeper源代码在网上比较不好整理,这里给大家整理好了,方便调试和理解,分布式系统就像...将会从源码层次研究分析Zookeeper,通过源码帮助我们深入理解Zookeeper实现思路,并提高我们对分布式一致性问题的认识。
zookeeper理论原理 ZAB机制 客户端流程 客户端源码分析 fastelection选举
zookeeper原始码分析 代码下载 原始码编译 zookeeper是用ant重建,需要安装ant环境 生成eclipse源码 cd E:\github\zookeeper ant eclipse 引进项目 导入eclipse 错误处理 错误1:编译完成后,用Eclipse打开编译...