`
zhangwei_david
  • 浏览: 468834 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ZooKeeper源码分析(二)

zk 
阅读更多

 

  上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码

 

 

/**
   这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器
 *
 */
public class ClientCnxn {
    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);

    private static final String ZK_SASL_CLIENT_USERNAME =
        "zookeeper.sasl.client.username";

    /** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为
	 */
    private static boolean disableAutoWatchReset;
    static {
        disableAutoWatchReset =
            Boolean.getBoolean("zookeeper.disableAutoWatchReset");
        if (LOG.isDebugEnabled()) {
            LOG.debug("zookeeper.disableAutoWatchReset is "
                    + disableAutoWatchReset);
        }
    }

    static class AuthData {
        AuthData(String scheme, byte data[]) {
            this.scheme = scheme;
            this.data = data;
        }

        String scheme;

        byte data[];
    }
	
    private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();

    /**
	 *哪些已经发送出去的目前正在等待响应的包
     */
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

    /**
     * 那些需要发送的包
     */
    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

	// 超时时间
    private int connectTimeout;

    /**
	 *客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求
     */
    private volatile int negotiatedSessionTimeout;
	
	// 读取超时时间
    private int readTimeout;

	// 会话超时时间
    private final int sessionTimeout;

	// ZooKeeper
    private final ZooKeeper zooKeeper;

	//客户端监视器管理器
    private final ClientWatchManager watcher;

	//会话ID
    private long sessionId;

	//会话密钥
    private byte sessionPasswd[] = new byte[16];

    // 是否只读
    private boolean readOnly;

    final String chrootPath;
   // 发送线程
    final SendThread sendThread;
	// 事件回调线程
    final EventThread eventThread;

    /**
     * Set to true when close is called. Latches the connection such that we
     * don't attempt to re-connect to the server if in the middle of closing the
     * connection (client sends session disconnect to server as part of close
     * operation)
     */
    private volatile boolean closing = false;
    
    /**
	  一组客户端可以连接的Zk主机
     */
    private final HostProvider hostProvider;

    /**
	 * 第一次和读写服务器建立连接时设置为true,之后不再改变。
	   这个值用来处理客户端没有sessionId连接只读模式服务器的场景.
	   客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以
	   当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话
	   如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的 
     */
    volatile boolean seenRwServerBefore = false;


    public ZooKeeperSaslClient zooKeeperSaslClient;

    public long getSessionId() {
        return sessionId;
    }

    public byte[] getSessionPasswd() {
        return sessionPasswd;
    }

    public int getSessionTimeout() {
        return negotiatedSessionTimeout;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();

        SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
        SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
        sb
            .append("sessionid:0x").append(Long.toHexString(getSessionId()))
            .append(" local:").append(local)
            .append(" remoteserver:").append(remote)
            .append(" lastZxid:").append(lastZxid)
            .append(" xid:").append(xid)
            .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
            .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
            .append(" queuedpkts:").append(outgoingQueue.size())
            .append(" pendingresp:").append(pendingQueue.size())
            .append(" queuedevents:").append(eventThread.waitingEvents.size());

        return sb.toString();
    }

   

    /**
     * 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用
     * 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器
     */
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
            throws IOException {
        this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
             clientCnxnSocket, 0, new byte[16], canBeReadOnly);
    }

    
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        //客户端实例
        this.zooKeeper = zooKeeper;
        // 客户端Watcher管理器
        this.watcher = watcher;
        //sessionId
        this.sessionId = sessionId;
        //会话密钥
        this.sessionPasswd = sessionPasswd;
        //会话超时时间
        this.sessionTimeout = sessionTimeout;
        //服务器地址列表管理器
        this.hostProvider = hostProvider;
         //根路径
        this.chrootPath = chrootPath;
       // 连接超时时间是会话超时时间和服务器数量的比值
        connectTimeout = sessionTimeout / hostProvider.size();
       // 读超时时间是会话超时时间的2/3
        readTimeout = sessionTimeout * 2 / 3;
		
        readOnly = canBeReadOnly;
        //创建发送和事件处理线程
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

   
    public static boolean getDisableAutoResetWatch() {
        return disableAutoWatchReset;
    }
   
    public static void setDisableAutoResetWatch(boolean b) {
        disableAutoWatchReset = b;
    }
	//启动发送和事件处理线程
    public void start() {
        sendThread.start();
        eventThread.start();
    }

 

 

 

// 事件处理线程
class EventThread extends Thread {
	    //等待处理的事件
        private final LinkedBlockingQueue<Object> waitingEvents =
            new LinkedBlockingQueue<Object>();

		 /**
		  *这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。
		  **/
        private volatile KeeperState sessionState = KeeperState.Disconnected;

       private volatile boolean wasKilled = false;
       private volatile boolean isRunning = false;

        EventThread() {
		   // 构造一个线程名
            super(makeThreadName("-EventThread"));
            setUncaughtExceptionHandler(uncaughtExceptionHandler);
			// 设置为守护线程
            setDaemon(true);
        }
        // 
        public void queueEvent(WatchedEvent event) {
			// 如果WatchedEvent的类型是None状态是sessionStat的值则不处理
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
			// 获取事件的状态
            sessionState = event.getState();

            // 构建一个基于事件的监视器
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // 排队pair,稍后处理
            waitingEvents.add(pair);
        }
		
		// 排队Packet
       public void queuePacket(Packet packet) {
          if (wasKilled) {
             synchronized (waitingEvents) {
                if (isRunning) waitingEvents.add(packet);
                else processEvent(packet);
             }
          } else {
             waitingEvents.add(packet);
          }
       }

        public void queueEventOfDeath() {
            waitingEvents.add(eventOfDeath);
        }

        @Override
        public void run() {
           try {
              isRunning = true;
              while (true) {
			     //从等待处理的事件队列中获取事件
                 Object event = waitingEvents.take();
				 
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down");
        }

		// 真正处理事件的入口,主要是回调处理
       private void processEvent(Object event) {
          try {
			// 如果事件是WatcherSetEventPair
              if (event instanceof WatcherSetEventPair) {
                  //每个监视器都会处理这个事件
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } else {
                  Packet p = (Packet) event;
                  int rc = 0;
				  // 获取客户端路径
                  String clientPath = p.clientPath;
                  if (p.replyHeader.getErr() != 0) {
                      rc = p.replyHeader.getErr();
                  }
                  if (p.cb == null) {
                      LOG.warn("Somehow a null cb got to EventThread!");
                  } else if (p.response instanceof ExistsResponse
                          || p.response instanceof SetDataResponse
                          || p.response instanceof SetACLResponse) {
						 // 获取回调对象
                      StatCallback cb = (StatCallback) p.cb;
					  // 如果处理成功回调方法会传入响应状态,否则响应状态为null
                      if (rc == 0) {
                          if (p.response instanceof ExistsResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetDataResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetDataResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetACLResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetACLResponse) p.response)
                                              .getStat());
                          }
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetDataResponse) {
                      DataCallback cb = (DataCallback) p.cb;
                      GetDataResponse rsp = (GetDataResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getData(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetACLResponse) {
                      ACLCallback cb = (ACLCallback) p.cb;
                      GetACLResponse rsp = (GetACLResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getAcl(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetChildrenResponse) {
                      ChildrenCallback cb = (ChildrenCallback) p.cb;
                      GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetChildren2Response) {
                      Children2Callback cb = (Children2Callback) p.cb;
                      GetChildren2Response rsp = (GetChildren2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }
                  } else if (p.response instanceof CreateResponse) {
                      StringCallback cb = (StringCallback) p.cb;
                      CreateResponse rsp = (CreateResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())));
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.cb instanceof VoidCallback) {
                      VoidCallback cb = (VoidCallback) p.cb;
                      cb.processResult(rc, clientPath, p.ctx);
                  }
              }
          } catch (Throwable t) {
              LOG.error("Caught unexpected throwable", t);
          }
       }
    }

 

5
3
分享到:
评论

相关推荐

    Zookeeper源码分析.epub

    Zookeeper源码分析.epub

    Zookeeper源码分析

    zookeeper源码分析(一)工作原理概述 zookeeper源码分析(二)FastLeader选举算法 Zookeeper源码分析之Paxos算法之旅

    zookeeper源码分析

    第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...

    zookeeper源码阅读

    ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...

    08_尚硅谷技术之Zookeeper(源码解析)V3.3.pdf

    Zookeeper3.5.1(源码解析)

    Zookeeper源码剖析:深入理解Leader选举机制

    本文详细分析了Zookeeper的源码,特别是Leader选举过程的实现。首先,介绍了阅读源码的意义,包括技术提升、框架掌握、问题定位、面试准备、深入理解技术以及参与开源社区。接着,提供了一系列高效阅读源码的方法,...

    ZooKeeper-:ZooKeeper源码剖析

    ZooKeeper源码分析 优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个...

    zookeeper第四节课原理源码分析资料1

    zookeeper第四节课原理源码分析资料1

    03-05-08-zookeeper源码之watcher原理分析1

    //注册监听//修改节点的值触发监ZooKeeper API 的初始化过程转载请注明《咕泡学院》,建议自己分析一遍在创建一个 ZooKeeper 客户端对象实例

    zookeeper分析流程.txt

    zookeeper源码执行流程。 Zookeeper是一个开源的分布式应用程序协调服务器,其为分布式系统提供一致性服务。其一致性是通过基于Paxos算法的ZAB协议完成的。其主要功能包括:配置维护、域名服务、分布式同步、集群...

    第四课:zookeeper ZAB协议实现源码分析1

    课程概要:启动流程源码分析快照与事物日志的存储结构一、启动流程知识点:工程结构介绍启动流程宏观图集群启动详细流程netty 服务工作机制1.工程结构介绍项目地址

    zookeeper-3.4.14.rar

    zookeeper-3.4.14...............................................................

    03-05-07-zookeeper原理之Leader选举源码分析1

    在第一节课,我们讲到了 zookeeper 的来源,是来自于 google chubby。为了解决在分布式环境下,如何从多个 server 中选举出 maste

    Zookeeper集群架构全面实战

    非常强的一套Zookeeper集群实战,包含了全套的学习代码,学习笔记还有...内容从Zookeeper入门教学,本地安装,Zookeeper集群实战,项目需求,Zookeeper企业面试实战,Zookeeper算法实战,Zookeeper核心源码分析等内容。

    java高级软件工程师教程快速入门Zookeeper+dubbo视频教程

    本套课程中,第一阶段深入Zookeeper原理和源码,分析Zookeeper的核心实现,并通过多个应用场景说明,体现出其重要性和广泛的应用性。第二阶段深入Dubbo RPC、SPI等核心功能点,列举多个示例加以应用说明,期望能够...

    深入探索Zookeeper的ZAB协议:分布式系统的核心解析深入探索Zookeeper的ZAB协议:分布式系统的核心解析

    此外,文章还提供了Zookeeper源码中关于ZAB协议实现的细节分析,为理解分布式系统的底层逻辑提供了宝贵的视角。本文适合对分布式系统感兴趣的开发者和研究者,帮助他们深入理解Zookeeper的核心机制

    zookeeperdemo.zip

    由于zookeeper源代码在网上比较不好整理,这里给大家整理好了,方便调试和理解,分布式系统就像...将会从源码层次研究分析Zookeeper,通过源码帮助我们深入理解Zookeeper实现思路,并提高我们对分布式一致性问题的认识。

    zookeeper理论原理

    zookeeper理论原理 ZAB机制 客户端流程 客户端源码分析 fastelection选举

    zookeeper:zookeeper原始码分析

    zookeeper原始码分析 代码下载 原始码编译 zookeeper是用ant重建,需要安装ant环境 生成eclipse源码 cd E:\github\zookeeper ant eclipse 引进项目 导入eclipse 错误处理 错误1:编译完成后,用Eclipse打开编译...

Global site tag (gtag.js) - Google Analytics