Categories
程式開發

動物園管理員選舉機制



{“ type”:“ doc”,“ content”:[{“type”:”heading”,”attrs”:{“align”:null,”level”:4},”content”:[{“type”:”text”,”text”:”欢迎关注公众号:有文化的技术人”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”marks”:[{“type”:”italic”}],“ text”:“定期分享原創優質文章”}]},{“ type”:“標題”,“ attrs”:{“ align”:null,“ level”:2},“ content”:[{“type”:”text”,”text”:”前言”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”ZooKeeper服务器会在本地处理只读请求(exists、getData和getChildren)。假如一个服务器接收到客户端的getData请求,服务器读取该状态信息,并将这些信息返回给客户端。因为服务器会在本地处理请求,所以ZooKeeper在处理以只读请求为主要负载时,性能会很高。我们还可以增加更多的服务器到ZooKeeper集群中,这样就可以处理更多的读请求,大幅提高整体处理能力。”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”那些会改变ZooKeeper状态的客户端请求(create、delete和setData)将会被转发给群首,集群在同一时刻只会存在一个群首,其他服务器追随群首被称为追随者(follower)。群首作为中心点处理所有对ZooKeeper系统变更的请求,它就像一个定序器,建立了所有对ZooKeeper状态的更新的顺序。Leader接收到客户端的请求后,会将请求构建成一个提议(Proposal),同时会为该提议绑定一个zxid(zxid可以表示执行顺序),然后将该提议广播到集群上的所有服务器,Leader等待Follwer反馈,当有过半数(>=N/2+1) 的Follower反馈信息后,Leader将再次向集群内Follower广播Commit信息,Commit为将之前的Proposal提交。”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ bulletedlist” ,“內容”:[{“type”:”listitem”,”content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”zxid:事务请求唯一标记,由leader服务器负责分配对事务请求进行定序,是8字节的long类型,由两部分组成:前4字节代表epoch,后4字节代表counter,即zxid=epoch+counter。”}]}]},{“ type”:“ listitem”,“ content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”epoch可以认为是Leader编号,每一次重新选举出一个新Leader时,都会为该Leader分配一个epoch,该值也是一个递增的,可以防止旧Leader活过来后继续广播之前旧提议造成状态不一致问题,只有当前Leader的提议才会被Follower处理。Leader没有进行选举期间,epoch是一致不会变化的。”}]}]},{“ type”:“ listitem”,“ content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”counter:ZooKeeper状态的每一次改变, counter就会递增加1.”}]}]},{“ type”:“ listitem”,“ content”:[{“type”:”paragraph”,”attrs”:{“indent”:0,”number”:0,”align”:null,”origin”:null},”content”:[{“type”:”text”,”text”:”zxid=epoch+counter,其中epoch不会改变,counter每次递增1,,这样zxid就具有递增性质, 如果zxid1小于zxid2, 那么zxid1肯定先于zxid2发生。”}]}]}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”这就是ZAB协议在处理数据一致性大致的原理流程,由于请求间可能存在着依赖关系,ZAB协议保证Leader广播的变更序列被顺序的处理:一个状态被处理那么它所依赖的状态也已经提前被处理;ZAB协议支持的崩溃恢复可以保证在Leader进程崩溃的时候可以重新选出Leader并且保证数据的完整性。”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ heading” ,“ attrs”:{“ align”:null,“ level”:2},“ content”:[{“type”:”link”,”attrs”:{“href”:”https://github.com/tunsuy/tunsuy.github.io/blob/master/zookeeper/Zookeeper%E9%80%89%E4%B8%BE%E6%9C%BA%E5%88%B6.md#%E4%B8%80%E9%80%89%E4%B8%BE%E5%88%9D%E5%A7%8B%E5%8C%96″,”title”:null}},{“type”:”text”,”text”:”一、选举初始化”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”Leader选举初始化入口:QuorumPeer.startLeaderElection(),代码如下:”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ codeblock” ,“ attrs”:{“ lang”:“ java”},“ content”:[{“type”:”text”,”text”:”public synchronized void startLeaderElection() {nttry {nttif (getPeerState() == ServerState.LOOKING) {ntttcurrentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());ntt}nt} catch (IOException e) {nttRuntimeException re = new RuntimeException(e.getMessage());nttre.setStackTrace(e.getStackTrace());nttthrow re;nt}nntthis.electionAlg = createElectionAlgorithm(electionType);n}”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”从上面可看出,当前节点在启动的时候,初始状态都是LOOKING,都会先投自己一票。”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”然后我们在进入createElectionAlgorithm:”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ codeblock” ,“ attrs”:{“ lang”:“ java”},“ content”:[{“type”:”text”,”text”:”protected Election createElectionAlgorithm(int electionAlgorithm) {ntElection le = null;nnt//TODO: use a factory rather than a switchntswitch (electionAlgorithm) {ntcase 1:nttthrow new UnsupportedOperationException(“Election Algorithm 1 is not supported.”);ntcase 2:nttthrow new UnsupportedOperationException(“Election Algorithm 2 is not supported.”);ntcase 3:nttQuorumCnxManager qcm = createCnxnManager();nttQuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);nttif (oldQcm != null) {ntttLOG.warn(“Clobbering already-set QuorumCnxManager (restarting leader election?)”);ntttoldQcm.halt();ntt}nttQuorumCnxManager.Listener listener = qcm.listener;nttif (listener != null) {ntttlistener.start();ntttFastLeaderElection fle = new FastLeaderElection(this, qcm);ntttfle.start();ntttle = fle;ntt} else {ntttLOG.error(“Null listener when initializing cnx manager”);ntt}nttbreak;ntdefault:nttassert false;nt}ntreturn le;n}”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”大致工作如下: 1、创建一个QuorumCnxManager实例; 2、启动QuorumCnxManager.Listener线程; 3、构建一种选举算法FastLeaderElection,早期Zookeeper实现了四种选举算法,但是后面废弃了三种,最新版本只保留FastLeaderElection这一种选举算法;”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”Leader选举期间集群中各节点之间互相进行投票,就会涉及到网络IO通信,QuorumCnxManager就是用来管理维护选举期间网络IO通信的工具类。”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”Leader选举涉及到两个核心类:QuorumCnxManager和FastLeaderElection,下面分别详细介绍”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ heading” ,“ attrs”:{“ align”:null,“ level”:2},“ content”:[{“type”:”link”,”attrs”:{“href”:”https://github.com/tunsuy/tunsuy.github.io/blob/master/zookeeper/Zookeeper%E9%80%89%E4%B8%BE%E6%9C%BA%E5%88%B6.md#%E4%BA%8C%E7%BD%91%E7%BB%9Cio”,”title”:null}},{“type”:”text”,”text”:”二、网络IO”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”QuorumCnxManager维护选举期间的网络IO的大致流程:”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ heading” ,“ attrs”:{“ align”:null,“ level”:3},“ content”:[{“type”:”link”,”attrs”:{“href”:”https://github.com/tunsuy/tunsuy.github.io/blob/master/zookeeper/Zookeeper%E9%80%89%E4%B8%BE%E6%9C%BA%E5%88%B6.md#listener”,”title”:null}},{“type”:”text”,”text”:”Listener”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”1、QuorumCnxManager有一个内部类Listener,其继承了Thread,初始化阶段就会启动该线程,Listener的run方法实现也非常简单:初始化一个ServerSocket,然后在一个while循环中调用accept接收客户端(注意:这里的客户端指的是集群中其它服务器)连接;”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ codeblock” ,“ attrs”:{“ lang”:“ java”},“ content”:[{“type”:”text”,”text”:”public void run() {n while((!shutdown) && (numRetries < 3)){n try {n ss = new ServerSocket();n ss.setReuseAddress(true);n addr=根据配置信息获取地址n setName(addr.toString());n //监听选举端口n ss.bind(addr);n while (!shutdown) {n try {n //接收客户端连接n client = ss.accept();n //设置连接参数n setSockOpts(client);n //开始处理n receiveConnection(client);n } catch (SocketTimeoutException e) {n }n }n } catch (IOException e) {n }n }n }n}"}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{"type":"text","text":"2、当有客户端连接进来后,会将该客户端Socket封装成RecvWorker和SendWorker,它们都是线程,分别负责和该Socket所代表的客户端进行读写;RecvWorker和SendWorker是成对出现的,每对负责维护和集群中的一台服务器进行网络IO通信;"}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ codeblock” ,“ attrs”:{“ lang”:“ java”},“ content”: { )); n handleConnection(sock,din); n} catch(IOException e){ n} n} n nprivate void handleConnection(Socket ock,DataInputStream din)拋出IOException { n t ... n t n //此處的思路是如果請求連接的服務器的ServerId小於當前子系統,則關閉連接,並由當前例程發起連接 n //隱含的意思就是ZK被中節點的連接都是由ServerId大的連ServerId小的 n if(sid[{"type":"text""text":"publicvoidreceiveConnection(finalSocketsock){nDataInputStreamdin=null;ntry{ndin=newDataInputStream(newBufferedInputStream(sockgetInputStream()));nhandleConnection(sockdin);n}catch(IOExceptione){n}n}nnprivatevoidhandleConnection(SocketsockDataInputStreamdin)throwsIOException{ntntn//这里的思路是如果请求连接的节点的ServerId小于当前节点,则关闭连接,并由当前节点发起连接n//隐含的意思就是ZK集群中节点的连接都是由ServerId大的连ServerId小的nif(sid[{"type":"text""text":"publicvoidreceiveConnection(finalSocketsock){nDataInputStreamdin=null;ntry{ndin=newDataInputStream(newBufferedInputStream(sockgetInputStream()));nhandleConnection(sockdin);n}catch(IOExceptione){n}n}nnprivatevoidhandleConnection(SocketsockDataInputStreamdin)throwsIOException{ntntn//这里的思路是如果请求连接的节点的ServerId小于当前节点,则关闭连接,并由当前节点发起连接n//隐含的意思就是ZK集群中节点的连接都是由ServerId大的连ServerId小的nif(sid< self.getId()) {n //如果连接已经建立则关闭n SendWorker sw = senderWorkerMap.get(sid);n if (sw != null) {n sw.finish();n }n closeSocket(sock);n //当前节点去连接对方节点n if (electionAddr != null) {n connectOne(sid, electionAddr);n } else {n connectOne(sid);n }n } else {n //如果接受该连接,则创建对应的读写workern SendWorker sw = new SendWorker(sock, sid);n RecvWorker rw = new RecvWorker(sock, din, sid, sw);n sw.setRecv(rw);n //如果已经创建则关闭旧的n SendWorker vsw = senderWorkerMap.get(sid);n if (vsw != null) {n vsw.finish();n }n senderWorkerMap.put(sid, sw);n queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY));n //启动读写事件处理n sw.start();n rw.start();n }n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"对于两个worker来说,它们本身的逻辑很简单,SendWorker就是不断的把queueSendMap中存放的对应serverId的数据发出去。RecvWorker就是把收到的数据加入recvQueue队列."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"现在假设这个场景:集群中存在A、B两个节点:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"当A节点连接B节点时,在B节点上会维护一对RecvWorker和SendWorker用于B节点和A节点进行通信;"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"同理,如果B节点连接A节点,在A节点上会维护一对RecvWorker和SendWorker用于A节点和B节点进行通信;"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"A和B之间创建了两条通道,实际上A和B间通信只需要一条通道即可,为避免浪费资源,Zookeeper采用如下原则:myid小的一方作为服务端,否则连接无效会被关闭;"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"比如A的myid是1,B的myid是2,如果A去连接B,B收到连接请求后B发现对端myid小于自己,判定该连接无效,会关闭该连接; 如果是B连接A,A收到连接请求后发现对端myid大于自己,则认为该连接有效,并会为该连接创建一对RecvWorker和SendWorker线程并启动"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"link","attrs":{"href":"https://github.com/tunsuy/tunsuy.github.io/blob/master/zookeeper/Zookeeper%E9%80%89%E4%B8%BE%E6%9C%BA%E5%88%B6.md#fastleaderelection","title":null}},{"type":"text","text":"FastLeaderElection"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"1、FastLeaderElection负责Leader选举核心规则算法实现,注意FastLeaderElection类中也包含了两个内部类WorkerSender和WorkerReceiver,类似于QuorumCnxManager中的SendWorker和RecvWorker,也是用于发送和接收线程;"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"java"},"content":[{"type":"text","text":"public void start() {n this.messenger.start();n}nnvoid start(){n //对应WorkerSender类n this.wsThread.start();n //对应WorkerReceiver类n this.wrThread.start();n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这里可以看到FastLeaderElection内部也是开启了两个线程负责读写,这里需要跟前面Listener的逻辑结合考虑。Listener开启的线程一个负责读取数据放入队列,一个负责把队列中的数据发出去,但读取的数据给谁用呢?发送的数据是哪来的呢?FastLeaderElection里的两线程就是跟它们交互的。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2、WorkSender线程代码如下:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"java"},"content":[{"type":"text","text":"public void run() {n while (!stop) {n try {n ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);n if(m == null) continue;n //处理发送消息n process(m);n } catch (InterruptedException e) {n break;n }n }n}nnvoid process(ToSend m) {n //序列化消息n ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),n m.leader,n m.zxid,n m.electionEpoch,n m.peerEpoch,n m.configData);n //发送数据n manager.toSend(m.sid, requestBuffer);nn}npublic void toSend(Long sid, ByteBuffer b) {n //如果数据时发送给自己的那么绕过IO直接加入到recv队列n if (this.mySid == sid) {n b.position(0);n addToRecvQueue(new Message(b.duplicate(), sid));n } else {n //否则把数据加入到指定ServerId的待发送队列n ArrayBlockingQueue bq = new ArrayBlockingQueue(SEND_CAPACITY);n ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq);n if (oldq != null) {n addToSendQueue(oldq, b);n } else {n addToSendQueue(bq, b);n }n //连接指定ServerId,该方法内部如果连接已经建立则会返回,否则创建连接n connectOne(sid);n n }n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"从上面可看出,FastLeaderElection中进行选举广播投票信息时,将投票信息写入到对端服务器大致流程如下:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"将数据封装成ToSend格式放入到sendqueue;"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"WorkerSender线程会一直轮询提取sendqueue中的数据,当提取到ToSend数据后,会获取到集群中所有参与Leader选举节点(除Observer节点外的节点)的sid,如果sid即为本机节点,则转成Notification直接放入到recvqueue中,因为本机不再需要走网络IO;否则放入到queueSendMap中,key是要发送给哪个服务器节点的sid,ByteBuffer即为ToSend的内容,queueSendMap维护的着当前节点要发送的网络数据信息,由于发送到同一个sid服务器可能存在多条数据,所以queueSendMap的value是一个queue类型;"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"QuorumCnxManager中的SendWorkder线程不停轮询queueSendMap中是否存在自己要发送的数据,每个SendWorkder线程都会绑定一个sid用于标记该SendWorkder线程和哪个对端服务器进行通信,因此,queueSendMap.get(sid)即可获取该线程要发送数据的queue,然后通过queue.poll()即可提取该线程要发送的数据内容;"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"然后通过调用SendWorkder内部维护的socket输出流即可将数据写入到对端服务器"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"3、WorkerReceiver线程代码如下:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"java"},"content":[{"type":"text","text":"public void run() {nn Message response;n while (!stop) {n try {n //这里本质上是从recvQueue里取出数据n response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);n //没有数据则继续等待n if(response == null) continue;n nttt...ntttn int rstate = response.buffer.getInt();n long rleader = response.buffer.getLong();n long rzxid = response.buffer.getLong();n long relectionEpoch = response.buffer.getLong();n long rpeerepoch;n QuorumVerifier rqv = null;n //如果不是一个有投票权的节点,例如Observer节点n if(!validVoter(response.sid)) {n //直接把自己的投票信息返回n Vote current = self.getCurrentVote();n QuorumVerifier qv = self.getQuorumVerifier();n ToSend notmsg = new ToSend(ToSend.mType.notification,n current.getId(),n current.getZxid(),n logicalclock.get(),n self.getPeerState(),n response.sid,n current.getPeerEpoch(),n qv.toString().getBytes());n sendqueue.offer(notmsg);n } else {n //获取发消息的节点的状态n QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;n switch (rstate) {n case 0:n ackstate = QuorumPeer.ServerState.LOOKING;n break;n case 1:n ackstate = QuorumPeer.ServerState.FOLLOWING;n break;n case 2:n ackstate = QuorumPeer.ServerState.LEADING;n break;n case 3:n ackstate = QuorumPeer.ServerState.OBSERVING;n break;n default:n continue;n }nn //赋值Notificationn n.leader = rleader;n n.zxid = rzxid;n n.electionEpoch = relectionEpoch;n n.state = ackstate;n n.sid = response.sid;n n.peerEpoch = rpeerepoch;n n.version = version;n n.qv = rqv;n //如果当前节点正在寻找Leadern if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){n //把收到的消息加入队列n recvqueue.offer(n);n //如果对方节点也是LOOKING状态,且周期小于自己,则把自己投票信息发回去n if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch current, replace and send messages outntif (n.electionEpoch > logicalclock.get()){ n t tlogicalclock.set(n.electionEpoch); n t trecvset.clear(); n t tif(totalOrderPredicate(n.leader,n.zxid,n .peerEpoch,getInitId(),getInitLastLoggedZxid(),getPeerEpoch())){ n t t tupdateProposal(n.leader,n.zxid,n.peerEpoch); n t t}其他{ n t t tupdateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch()); n t t} n t tsendNotifications(); n t}否則,如果(n.electionEpoch curEpoch) n t t t || (((newEpoch == curEpoch) n t t t t &&((newZxid> curZxid) n t t t t t t ||((newZxid == curZxid) n t t t t t t &&(newId> curId))))))); n}“}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”这个PK逻辑原理(胜出一方代表更有希望成为Leader)如下: 1、首先比较epoch,哪个epoch大哪个胜出,前面介绍过epoch代表了Leader的轮次,是一个递增的,epoch越大就意味着数据越新,Leader数据越新则可以减少后续数据同步的效率,当然应该优先选为Leader; 2、然后才是比较zxid,由于zxid=epoch+counter,第一步已经把epoch比较过了,其实这步骤只是相当于比较counter大小,counter越大则代表数据越新,优先选为Leader。注:其实第1和第2可以合并到一起,直接比较zxid即可,因为zxid=epoch+counter,第1比较显的有些多余”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null},“ content”:[{“type”:”text”,”text”:”3、如果前两个指标都没法比较出来,只能通过sid来确定,zxid相等说明两个服务器的数据是一致的,所以选哪个当Leader其实没有区别,这里就随机选择一个sid大的当Leader”}]},{“ type”:“ paragraph”,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}},{“ type”:“ paragraph” ,“ attrs”:{“ indent”:0,“ number”:0,“ align”:null,“ origin”:null}}]}