Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Jun 24 12:04:58 2008 @@ -1,369 +1,369 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.HashMap; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; - -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.InputArchive; -import org.apache.jute.OutputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.ServerCnxn; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ZooTrace; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.txn.TxnHeader; - -/** - * This class has the control logic for the Follower. - */ -public class Follower { - private static final Logger LOG = Logger.getLogger(Follower.class); - - QuorumPeer self; - - FollowerZooKeeperServer zk; - - Follower(QuorumPeer self,FollowerZooKeeperServer zk) { - this.self = self; - this.zk=zk; - } - - private InputArchive leaderIs; - - private OutputArchive leaderOs; - - private BufferedOutputStream bufferedOutput; - - public Socket sock; - - /** - * write a packet to the leader - * - * @param pp - * the proposal packet to be sent to the leader - * @throws IOException - */ - void writePacket(QuorumPacket pp) throws IOException { - long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; - if (pp.getType() == Leader.PING) { - traceMask = ZooTrace.SERVER_PING_TRACE_MASK; - } - ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp); - synchronized (leaderOs) { - leaderOs.writeRecord(pp, "packet"); - bufferedOutput.flush(); - } - } - - /** - * read a packet from the leader - * - * @param pp - * the packet to be instantiated - * @throws IOException - */ - void readPacket(QuorumPacket pp) throws IOException { - synchronized (leaderIs) { - leaderIs.readRecord(pp, "packet"); - } - long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; - if (pp.getType() == Leader.PING) { - traceMask = ZooTrace.SERVER_PING_TRACE_MASK; - } - ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp); - } - - /** - * the main method called by the follower to follow the leader - * - * @throws InterruptedException - */ - void followLeader() throws InterruptedException { - InetSocketAddress addr = null; - // Find the leader by id - for (QuorumServer s : self.quorumPeers) { - if (s.id == self.currentVote.id) { - addr = s.addr; - break; - } - } - if (addr == null) { - LOG.warn("Couldn't find the leader with id = " - + self.currentVote.id); - } - LOG.info("Following " + addr); - sock = new Socket(); - try { - QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); - sock.setSoTimeout(self.tickTime * self.initLimit); - for (int tries = 0; tries < 5; tries++) { - try { - //sock = new Socket(); - //sock.setSoTimeout(self.tickTime * self.initLimit); - sock.connect(addr, self.tickTime * self.syncLimit); - sock.setTcpNoDelay(true); - break; - } catch (ConnectException e) { - if (tries == 4) { - LOG.error("Unexpected exception",e); - throw e; - } else { - LOG.warn("Unexpected exception",e); - sock = new Socket(); - sock.setSoTimeout(self.tickTime * self.initLimit); - } - } - Thread.sleep(1000); - } - leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( - sock.getInputStream())); - bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); - leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); - QuorumPacket qp = new QuorumPacket(); - qp.setType(Leader.LASTZXID); - long sentLastZxid = self.getLastLoggedZxid(); - qp.setZxid(sentLastZxid); - writePacket(qp); - readPacket(qp); - long newLeaderZxid = qp.getZxid(); - - if (qp.getType() != Leader.NEWLEADER) { - LOG.error("First packet should have been NEWLEADER"); - throw new IOException("First packet should have been NEWLEADER"); - } - readPacket(qp); - synchronized (zk) { - if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader!"); - zk.loadData(); - } - else if (qp.getType() == Leader.SNAP) { - LOG.info("Getting a snapshot from leader"); - // The leader is going to dump the database - zk.loadData(leaderIs); - String signature = leaderIs.readString("signature"); - if (!signature.equals("BenWasHere")) { - LOG.error("Missing signature. Got " + signature); - throw new IOException("Missing signature"); - } - } else if (qp.getType() == Leader.TRUNC) { - //we need to truncate the log to the lastzxid of the leader - LOG.warn("Truncating log to get in sync with the leader " - + Long.toHexString(qp.getZxid())); - zk.truncateLog(qp.getZxid()); - zk.loadData(); - } - else { - LOG.error("Got unexpected packet from leader " - + qp.getType() + " exiting ... " ); - System.exit(13); - } - zk.dataTree.lastProcessedZxid = newLeaderZxid; - } - ack.setZxid(newLeaderZxid & ~0xffffffffL); - writePacket(ack); - sock.setSoTimeout(self.tickTime * self.syncLimit); - zk.startup(); - while (self.running) { - readPacket(qp); - switch (qp.getType()) { - case Leader.PING: - // Send back the ping with our session data - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - HashMap<Long, Integer> touchTable = ((FollowerZooKeeperServer) zk) - .getTouchSnapshot(); - for (Entry<Long, Integer> entry : touchTable.entrySet()) { - dos.writeLong(entry.getKey()); - dos.writeInt(entry.getValue()); - } - qp.setData(bos.toByteArray()); - writePacket(qp); - break; - case Leader.PROPOSAL: - TxnHeader hdr = new TxnHeader(); - BinaryInputArchive ia = BinaryInputArchive - .getArchive(new ByteArrayInputStream(qp.getData())); - Record txn = ZooKeeperServer.deserializeTxn(ia, hdr); - if (hdr.getZxid() != lastQueued + 1) { - LOG.warn("Got zxid " - + Long.toHexString(hdr.getZxid()) - + " expected " - + Long.toHexString(lastQueued + 1)); - } - lastQueued = hdr.getZxid(); - zk.logRequest(hdr, txn); - break; - case Leader.COMMIT: - zk.commit(qp.getZxid()); - break; - case Leader.UPTODATE: - zk.snapshot(); - self.cnxnFactory.setZooKeeperServer(zk); - break; - case Leader.REVALIDATE: - ByteArrayInputStream bis = new ByteArrayInputStream(qp - .getData()); - DataInputStream dis = new DataInputStream(bis); - long sessionId = dis.readLong(); - boolean valid = dis.readBoolean(); - synchronized (pendingRevalidations) { - ServerCnxn cnxn = pendingRevalidations - .remove(sessionId); - if (cnxn == null) { - LOG.warn("Missing " - + Long.toHexString(sessionId) - + " for validation"); - } else { - cnxn.finishSessionInit(valid); - } - } - ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, - "Session " + sessionId - + " is valid: " + valid); - break; - case Leader.SYNC: - zk.sync(); - break; - } - } - } catch (IOException e) { - e.printStackTrace(); - try { - sock.close(); - } catch (IOException e1) { - e1.printStackTrace(); - } - - synchronized (pendingRevalidations) { - // clear pending revalitions - pendingRevalidations.clear(); - pendingRevalidations.notifyAll(); - } - } - } - - private long lastQueued; - - ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>(); - - /** - * validate a seesion for a client - * - * @param clientId - * the client to be revailidated - * @param timeout - * the timeout for which the session is valid - * @return - * @throws IOException - * @throws InterruptedException - */ - void validateSession(ServerCnxn cnxn, long clientId, int timeout) - throws IOException, InterruptedException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - dos.writeLong(clientId); - dos.writeInt(timeout); - dos.close(); - QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos - .toByteArray(), null); - pendingRevalidations.put(clientId, cnxn); - ZooTrace.logTraceMessage(LOG, - ZooTrace.SESSION_TRACE_MASK, - "To validate session " - + Long.toHexString(clientId)); - writePacket(qp); - } - - /** - * send a request packet to the leader - * - * @param request - * the request from the client - * @throws IOException - */ - void request(Request request) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream oa = new DataOutputStream(baos); - oa.writeLong(request.sessionId); - oa.writeInt(request.cxid); - oa.writeInt(request.type); - if (request.request != null) { - request.request.rewind(); - int len = request.request.remaining(); - byte b[] = new byte[len]; - request.request.get(b); - request.request.rewind(); - oa.write(b); - } - oa.close(); - QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos - .toByteArray(), request.authInfo); -// QuorumPacket qp; -// if(request.type == OpCode.sync){ -// qp = new QuorumPacket(Leader.SYNC, -1, baos -// .toByteArray(), request.authInfo); -// } -// else{ -// qp = new QuorumPacket(Leader.REQUEST, -1, baos -// .toByteArray(), request.authInfo); -// } - writePacket(qp); - } - - public long getZxid() { - try { - synchronized (zk) { - return zk.getZxid(); - } - } catch (NullPointerException e) { - } - return -1; - } - - public void shutdown() { - // set the zookeeper server to null - self.cnxnFactory.setZooKeeperServer(null); - // clear all the connections - self.cnxnFactory.clear(); - // shutdown previous zookeeper - if (zk != null) { - zk.shutdown(); - - } - LOG.error("FIXMSG",new Exception("shutdown Follower")); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.txn.TxnHeader; + +/** + * This class has the control logic for the Follower. + */ +public class Follower { + private static final Logger LOG = Logger.getLogger(Follower.class); + + QuorumPeer self; + + FollowerZooKeeperServer zk; + + Follower(QuorumPeer self,FollowerZooKeeperServer zk) { + this.self = self; + this.zk=zk; + } + + private InputArchive leaderIs; + + private OutputArchive leaderOs; + + private BufferedOutputStream bufferedOutput; + + public Socket sock; + + /** + * write a packet to the leader + * + * @param pp + * the proposal packet to be sent to the leader + * @throws IOException + */ + void writePacket(QuorumPacket pp) throws IOException { + long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; + if (pp.getType() == Leader.PING) { + traceMask = ZooTrace.SERVER_PING_TRACE_MASK; + } + ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp); + synchronized (leaderOs) { + leaderOs.writeRecord(pp, "packet"); + bufferedOutput.flush(); + } + } + + /** + * read a packet from the leader + * + * @param pp + * the packet to be instantiated + * @throws IOException + */ + void readPacket(QuorumPacket pp) throws IOException { + synchronized (leaderIs) { + leaderIs.readRecord(pp, "packet"); + } + long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; + if (pp.getType() == Leader.PING) { + traceMask = ZooTrace.SERVER_PING_TRACE_MASK; + } + ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp); + } + + /** + * the main method called by the follower to follow the leader + * + * @throws InterruptedException + */ + void followLeader() throws InterruptedException { + InetSocketAddress addr = null; + // Find the leader by id + for (QuorumServer s : self.quorumPeers) { + if (s.id == self.currentVote.id) { + addr = s.addr; + break; + } + } + if (addr == null) { + LOG.warn("Couldn't find the leader with id = " + + self.currentVote.id); + } + LOG.info("Following " + addr); + sock = new Socket(); + try { + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); + sock.setSoTimeout(self.tickTime * self.initLimit); + for (int tries = 0; tries < 5; tries++) { + try { + //sock = new Socket(); + //sock.setSoTimeout(self.tickTime * self.initLimit); + sock.connect(addr, self.tickTime * self.syncLimit); + sock.setTcpNoDelay(true); + break; + } catch (ConnectException e) { + if (tries == 4) { + LOG.error("Unexpected exception",e); + throw e; + } else { + LOG.warn("Unexpected exception",e); + sock = new Socket(); + sock.setSoTimeout(self.tickTime * self.initLimit); + } + } + Thread.sleep(1000); + } + leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( + sock.getInputStream())); + bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); + leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); + QuorumPacket qp = new QuorumPacket(); + qp.setType(Leader.LASTZXID); + long sentLastZxid = self.getLastLoggedZxid(); + qp.setZxid(sentLastZxid); + writePacket(qp); + readPacket(qp); + long newLeaderZxid = qp.getZxid(); + + if (qp.getType() != Leader.NEWLEADER) { + LOG.error("First packet should have been NEWLEADER"); + throw new IOException("First packet should have been NEWLEADER"); + } + readPacket(qp); + synchronized (zk) { + if (qp.getType() == Leader.DIFF) { + LOG.info("Getting a diff from the leader!"); + zk.loadData(); + } + else if (qp.getType() == Leader.SNAP) { + LOG.info("Getting a snapshot from leader"); + // The leader is going to dump the database + zk.loadData(leaderIs); + String signature = leaderIs.readString("signature"); + if (!signature.equals("BenWasHere")) { + LOG.error("Missing signature. Got " + signature); + throw new IOException("Missing signature"); + } + } else if (qp.getType() == Leader.TRUNC) { + //we need to truncate the log to the lastzxid of the leader + LOG.warn("Truncating log to get in sync with the leader " + + Long.toHexString(qp.getZxid())); + zk.truncateLog(qp.getZxid()); + zk.loadData(); + } + else { + LOG.error("Got unexpected packet from leader " + + qp.getType() + " exiting ... " ); + System.exit(13); + } + zk.dataTree.lastProcessedZxid = newLeaderZxid; + } + ack.setZxid(newLeaderZxid & ~0xffffffffL); + writePacket(ack); + sock.setSoTimeout(self.tickTime * self.syncLimit); + zk.startup(); + while (self.running) { + readPacket(qp); + switch (qp.getType()) { + case Leader.PING: + // Send back the ping with our session data + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + HashMap<Long, Integer> touchTable = ((FollowerZooKeeperServer) zk) + .getTouchSnapshot(); + for (Entry<Long, Integer> entry : touchTable.entrySet()) { + dos.writeLong(entry.getKey()); + dos.writeInt(entry.getValue()); + } + qp.setData(bos.toByteArray()); + writePacket(qp); + break; + case Leader.PROPOSAL: + TxnHeader hdr = new TxnHeader(); + BinaryInputArchive ia = BinaryInputArchive + .getArchive(new ByteArrayInputStream(qp.getData())); + Record txn = ZooKeeperServer.deserializeTxn(ia, hdr); + if (hdr.getZxid() != lastQueued + 1) { + LOG.warn("Got zxid " + + Long.toHexString(hdr.getZxid()) + + " expected " + + Long.toHexString(lastQueued + 1)); + } + lastQueued = hdr.getZxid(); + zk.logRequest(hdr, txn); + break; + case Leader.COMMIT: + zk.commit(qp.getZxid()); + break; + case Leader.UPTODATE: + zk.snapshot(); + self.cnxnFactory.setZooKeeperServer(zk); + break; + case Leader.REVALIDATE: + ByteArrayInputStream bis = new ByteArrayInputStream(qp + .getData()); + DataInputStream dis = new DataInputStream(bis); + long sessionId = dis.readLong(); + boolean valid = dis.readBoolean(); + synchronized (pendingRevalidations) { + ServerCnxn cnxn = pendingRevalidations + .remove(sessionId); + if (cnxn == null) { + LOG.warn("Missing " + + Long.toHexString(sessionId) + + " for validation"); + } else { + cnxn.finishSessionInit(valid); + } + } + ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, + "Session " + sessionId + + " is valid: " + valid); + break; + case Leader.SYNC: + zk.sync(); + break; + } + } + } catch (IOException e) { + e.printStackTrace(); + try { + sock.close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + + synchronized (pendingRevalidations) { + // clear pending revalitions + pendingRevalidations.clear(); + pendingRevalidations.notifyAll(); + } + } + } + + private long lastQueued; + + ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>(); + + /** + * validate a seesion for a client + * + * @param clientId + * the client to be revailidated + * @param timeout + * the timeout for which the session is valid + * @return + * @throws IOException + * @throws InterruptedException + */ + void validateSession(ServerCnxn cnxn, long clientId, int timeout) + throws IOException, InterruptedException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeLong(clientId); + dos.writeInt(timeout); + dos.close(); + QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos + .toByteArray(), null); + pendingRevalidations.put(clientId, cnxn); + ZooTrace.logTraceMessage(LOG, + ZooTrace.SESSION_TRACE_MASK, + "To validate session " + + Long.toHexString(clientId)); + writePacket(qp); + } + + /** + * send a request packet to the leader + * + * @param request + * the request from the client + * @throws IOException + */ + void request(Request request) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeLong(request.sessionId); + oa.writeInt(request.cxid); + oa.writeInt(request.type); + if (request.request != null) { + request.request.rewind(); + int len = request.request.remaining(); + byte b[] = new byte[len]; + request.request.get(b); + request.request.rewind(); + oa.write(b); + } + oa.close(); + QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos + .toByteArray(), request.authInfo); +// QuorumPacket qp; +// if(request.type == OpCode.sync){ +// qp = new QuorumPacket(Leader.SYNC, -1, baos +// .toByteArray(), request.authInfo); +// } +// else{ +// qp = new QuorumPacket(Leader.REQUEST, -1, baos +// .toByteArray(), request.authInfo); +// } + writePacket(qp); + } + + public long getZxid() { + try { + synchronized (zk) { + return zk.getZxid(); + } + } catch (NullPointerException e) { + } + return -1; + } + + public void shutdown() { + // set the zookeeper server to null + self.cnxnFactory.setZooKeeperServer(null); + // clear all the connections + self.cnxnFactory.clear(); + // shutdown previous zookeeper + if (zk != null) { + zk.shutdown(); + + } + LOG.error("FIXMSG",new Exception("shutdown Follower")); + } +}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java Tue Jun 24 12:04:58 2008 @@ -1,391 +1,391 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.log4j.Logger; - -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.ZooDefs.OpCode; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ZooTrace; -import org.apache.zookeeper.server.quorum.Leader.Proposal; -import org.apache.zookeeper.txn.TxnHeader; - -/** - * There will be an instance of this class created by the Leader for each - * follower.All communication for a given Follower will be handled by this - * class. - */ -public class FollowerHandler extends Thread { - private static final Logger LOG = Logger.getLogger(FollowerHandler.class); - - public Socket s; - - Leader leader; - - long tickOfLastAck; - - /** - * The packets to be sent to the follower - */ - LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>(); - - private BinaryInputArchive ia; - - private BinaryOutputArchive oa; - - private BufferedOutputStream bufferedOutput; - - FollowerHandler(Socket s, Leader leader) throws IOException { - super("FollowerHandler-" + s.getRemoteSocketAddress()); - this.s = s; - this.leader = leader; - leader.addFollowerHandler(this); - start(); - } - - /** - * If this packet is queued, the sender thread will exit - */ - QuorumPacket proposalOfDeath = new QuorumPacket(); - - /** - * This method will use the thread to send packets added to the - * queuedPackets list - * - * @throws InterruptedException - */ - private void sendPackets() throws InterruptedException { - long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; - while (true) { - QuorumPacket p; - p = queuedPackets.take(); - - if (p == proposalOfDeath) { - // Packet of death! - break; - } - if (p.getType() == Leader.PING) { - traceMask = ZooTrace.SERVER_PING_TRACE_MASK; - } - ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p); - try { - oa.writeRecord(p, "packet"); - bufferedOutput.flush(); - } catch (IOException e) { - if (!s.isClosed()) { - LOG.warn("Unexpected exception",e); - } - break; - } - } - } - - static public String packetToString(QuorumPacket p) { - if (true) - return null; - String type = null; - String mess = null; - Record txn = null; - switch (p.getType()) { - case Leader.ACK: - type = "ACK"; - break; - case Leader.COMMIT: - type = "COMMIT"; - break; - case Leader.LASTZXID: - type = "LASTZXID"; - break; - case Leader.NEWLEADER: - type = "NEWLEADER"; - break; - case Leader.PING: - type = "PING"; - break; - case Leader.PROPOSAL: - type = "PROPOSAL"; - BinaryInputArchive ia = BinaryInputArchive - .getArchive(new ByteArrayInputStream(p.getData())); - TxnHeader hdr = new TxnHeader(); - try { - txn = ZooKeeperServer.deserializeTxn(ia, hdr); - // mess = "transaction: " + txn.toString(); - } catch (IOException e) { - LOG.warn("Unexpected exception",e); - } - break; - case Leader.REQUEST: - type = "REQUEST"; - break; - case Leader.REVALIDATE: - type = "REVALIDATE"; - ByteArrayInputStream bis = new ByteArrayInputStream(p.getData()); - DataInputStream dis = new DataInputStream(bis); - try { - long id = dis.readLong(); - mess = " sessionid = " + id; - } catch (IOException e) { - LOG.warn("Unexpected exception", e); - } - - break; - case Leader.UPTODATE: - type = "UPTODATE"; - break; - default: - type = "UNKNOWN" + p.getType(); - } - String entry = null; - if (type != null) { - entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess; - } - return entry; - } - - /** - * This thread will receive packets from the follower and process them and - * also listen to new connections from new followers. - */ - public void run() { - try { - - ia = BinaryInputArchive.getArchive(new BufferedInputStream(s - .getInputStream())); - bufferedOutput = new BufferedOutputStream(s.getOutputStream()); - oa = BinaryOutputArchive.getArchive(bufferedOutput); - - QuorumPacket qp = new QuorumPacket(); - ia.readRecord(qp, "packet"); - if (qp.getType() != Leader.LASTZXID) { - LOG.error("First packet " + qp.toString() - + " is not LASTZXID!"); - return; - } - long peerLastZxid = qp.getZxid(); - int packetToSend = Leader.SNAP; - boolean logTxns = true; - - long zxidToSend = 0; - // we are sending the diff - synchronized(leader.zk.committedLog) { - if (leader.zk.committedLog.size() != 0) { - if ((leader.zk.maxCommittedLog >= peerLastZxid) - && (leader.zk.minCommittedLog <= peerLastZxid)) { - packetToSend = Leader.DIFF; - zxidToSend = leader.zk.maxCommittedLog; - for (Proposal propose: leader.zk.committedLog) { - if (propose.packet.getZxid() > peerLastZxid) { - queuePacket(propose.packet); - QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), - null, null); - queuePacket(qcommit); - - } - } - } - } - else { - logTxns = false; - } } - long leaderLastZxid = leader.startForwarding(this, peerLastZxid); - QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, - leaderLastZxid, null, null); - oa.writeRecord(newLeaderQP, "packet"); - bufferedOutput.flush(); - // a special case when both the ids are the same - if (peerLastZxid == leaderLastZxid) { - packetToSend = Leader.DIFF; - zxidToSend = leaderLastZxid; - } - //check if we decided to send a diff or we need to send a truncate - // we avoid using epochs for truncating because epochs make things - // complicated. Two epochs might have the last 32 bits as same. - // only if we know that there is a committed zxid in the queue that - // is less than the one the peer has we send a trunc else to make - // things simple we just send sanpshot. - if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) { - // this is the only case that we are sure that - // we can ask the follower to truncate the log - packetToSend = Leader.TRUNC; - zxidToSend = leader.zk.maxCommittedLog; - - } - oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); - bufferedOutput.flush(); - // only if we are not truncating or fast sycning - if (packetToSend == Leader.SNAP) { - LOG.warn("Sending snapshot last zxid of peer is " - + Long.toHexString(peerLastZxid) + " " + " zxid of leader is " - + Long.toHexString(leaderLastZxid)); - // Dump data to follower - leader.zk.snapshot(oa); - oa.writeString("BenWasHere", "signature"); - } - bufferedOutput.flush(); - // - // Mutation packets will be queued during the serialize, - // so we need to mark when the follower can actually start - // using the data - // - queuedPackets - .add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); - - // Start sending packets - new Thread() { - public void run() { - Thread.currentThread().setName( - "Sender-" + s.getRemoteSocketAddress()); - try { - sendPackets(); - } catch (InterruptedException e) { - LOG.warn("Interrupted",e); - } - } - }.start(); - - while (true) { - qp = new QuorumPacket(); - ia.readRecord(qp, "packet"); - - long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; - if (qp.getType() == Leader.PING) { - traceMask = ZooTrace.SERVER_PING_TRACE_MASK; - } - ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); - tickOfLastAck = leader.self.tick; - - - ByteBuffer bb; - long sessionId; - int cxid; - int type; - - switch (qp.getType()) { - case Leader.ACK: - leader.processAck(qp.getZxid(), s.getLocalSocketAddress()); - break; - case Leader.PING: - // Process the touches - ByteArrayInputStream bis = new ByteArrayInputStream(qp - .getData()); - DataInputStream dis = new DataInputStream(bis); - while (dis.available() > 0) { - long sess = dis.readLong(); - int to = dis.readInt(); - leader.zk.touch(sess, to); - } - break; - case Leader.REVALIDATE: - bis = new ByteArrayInputStream(qp.getData()); - dis = new DataInputStream(bis); - long id = dis.readLong(); - int to = dis.readInt(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - dos.writeLong(id); - boolean valid = leader.zk.touch(id, to); - ZooTrace.logTraceMessage(LOG, - ZooTrace.SESSION_TRACE_MASK, - "Session " + Long.toHexString(id) - + " is valid: "+ valid); - dos.writeBoolean(valid); - qp.setData(bos.toByteArray()); - queuedPackets.add(qp); - break; - case Leader.REQUEST: - bb = ByteBuffer.wrap(qp.getData()); - sessionId = bb.getLong(); - cxid = bb.getInt(); - type = bb.getInt(); - bb = bb.slice(); - if(type == OpCode.sync){ - leader.setSyncHandler(this, sessionId); - } - leader.zk.submitRequest(null, sessionId, type, cxid, bb, - qp.getAuthinfo()); - break; - default: - } - } - } catch (IOException e) { - if (s != null && !s.isClosed()) { - LOG.error("FIXMSG",e); - } - } catch (InterruptedException e) { - LOG.error("FIXMSG",e); - } finally { - LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress() - + " ********"); - // Send the packet of death - try { - queuedPackets.put(proposalOfDeath); - } catch (InterruptedException e) { - LOG.error("FIXMSG",e); - } - shutdown(); - } - } - - public void shutdown() { - try { - if (s != null && !s.isClosed()) { - s.close(); - } - } catch (IOException e) { - LOG.error("FIXMSG",e); - } - leader.removeFollowerHandler(this); - } - - public long tickOfLastAck() { - return tickOfLastAck; - } - - /** - * ping calls from the leader to the followers - */ - public void ping() { - QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed, - null, null); - queuePacket(ping); - } - - void queuePacket(QuorumPacket p) { - queuedPackets.add(p); - } - - public boolean synced() { - return isAlive() - && tickOfLastAck >= leader.self.tick - leader.self.syncLimit; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.txn.TxnHeader; + +/** + * There will be an instance of this class created by the Leader for each + * follower.All communication for a given Follower will be handled by this + * class. + */ +public class FollowerHandler extends Thread { + private static final Logger LOG = Logger.getLogger(FollowerHandler.class); + + public Socket s; + + Leader leader; + + long tickOfLastAck; + + /** + * The packets to be sent to the follower + */ + LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>(); + + private BinaryInputArchive ia; + + private BinaryOutputArchive oa; + + private BufferedOutputStream bufferedOutput; + + FollowerHandler(Socket s, Leader leader) throws IOException { + super("FollowerHandler-" + s.getRemoteSocketAddress()); + this.s = s; + this.leader = leader; + leader.addFollowerHandler(this); + start(); + } + + /** + * If this packet is queued, the sender thread will exit + */ + QuorumPacket proposalOfDeath = new QuorumPacket(); + + /** + * This method will use the thread to send packets added to the + * queuedPackets list + * + * @throws InterruptedException + */ + private void sendPackets() throws InterruptedException { + long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; + while (true) { + QuorumPacket p; + p = queuedPackets.take(); + + if (p == proposalOfDeath) { + // Packet of death! + break; + } + if (p.getType() == Leader.PING) { + traceMask = ZooTrace.SERVER_PING_TRACE_MASK; + } + ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p); + try { + oa.writeRecord(p, "packet"); + bufferedOutput.flush(); + } catch (IOException e) { + if (!s.isClosed()) { + LOG.warn("Unexpected exception",e); + } + break; + } + } + } + + static public String packetToString(QuorumPacket p) { + if (true) + return null; + String type = null; + String mess = null; + Record txn = null; + switch (p.getType()) { + case Leader.ACK: + type = "ACK"; + break; + case Leader.COMMIT: + type = "COMMIT"; + break; + case Leader.LASTZXID: + type = "LASTZXID"; + break; + case Leader.NEWLEADER: + type = "NEWLEADER"; + break; + case Leader.PING: + type = "PING"; + break; + case Leader.PROPOSAL: + type = "PROPOSAL"; + BinaryInputArchive ia = BinaryInputArchive + .getArchive(new ByteArrayInputStream(p.getData())); + TxnHeader hdr = new TxnHeader(); + try { + txn = ZooKeeperServer.deserializeTxn(ia, hdr); + // mess = "transaction: " + txn.toString(); + } catch (IOException e) { + LOG.warn("Unexpected exception",e); + } + break; + case Leader.REQUEST: + type = "REQUEST"; + break; + case Leader.REVALIDATE: + type = "REVALIDATE"; + ByteArrayInputStream bis = new ByteArrayInputStream(p.getData()); + DataInputStream dis = new DataInputStream(bis); + try { + long id = dis.readLong(); + mess = " sessionid = " + id; + } catch (IOException e) { + LOG.warn("Unexpected exception", e); + } + + break; + case Leader.UPTODATE: + type = "UPTODATE"; + break; + default: + type = "UNKNOWN" + p.getType(); + } + String entry = null; + if (type != null) { + entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess; + } + return entry; + } + + /** + * This thread will receive packets from the follower and process them and + * also listen to new connections from new followers. + */ + public void run() { + try { + + ia = BinaryInputArchive.getArchive(new BufferedInputStream(s + .getInputStream())); + bufferedOutput = new BufferedOutputStream(s.getOutputStream()); + oa = BinaryOutputArchive.getArchive(bufferedOutput); + + QuorumPacket qp = new QuorumPacket(); + ia.readRecord(qp, "packet"); + if (qp.getType() != Leader.LASTZXID) { + LOG.error("First packet " + qp.toString() + + " is not LASTZXID!"); + return; + } + long peerLastZxid = qp.getZxid(); + int packetToSend = Leader.SNAP; + boolean logTxns = true; + + long zxidToSend = 0; + // we are sending the diff + synchronized(leader.zk.committedLog) { + if (leader.zk.committedLog.size() != 0) { + if ((leader.zk.maxCommittedLog >= peerLastZxid) + && (leader.zk.minCommittedLog <= peerLastZxid)) { + packetToSend = Leader.DIFF; + zxidToSend = leader.zk.maxCommittedLog; + for (Proposal propose: leader.zk.committedLog) { + if (propose.packet.getZxid() > peerLastZxid) { + queuePacket(propose.packet); + QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), + null, null); + queuePacket(qcommit); + + } + } + } + } + else { + logTxns = false; + } } + long leaderLastZxid = leader.startForwarding(this, peerLastZxid); + QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, + leaderLastZxid, null, null); + oa.writeRecord(newLeaderQP, "packet"); + bufferedOutput.flush(); + // a special case when both the ids are the same + if (peerLastZxid == leaderLastZxid) { + packetToSend = Leader.DIFF; + zxidToSend = leaderLastZxid; + } + //check if we decided to send a diff or we need to send a truncate + // we avoid using epochs for truncating because epochs make things + // complicated. Two epochs might have the last 32 bits as same. + // only if we know that there is a committed zxid in the queue that + // is less than the one the peer has we send a trunc else to make + // things simple we just send sanpshot. + if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) { + // this is the only case that we are sure that + // we can ask the follower to truncate the log + packetToSend = Leader.TRUNC; + zxidToSend = leader.zk.maxCommittedLog; + + } + oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); + bufferedOutput.flush(); + // only if we are not truncating or fast sycning + if (packetToSend == Leader.SNAP) { + LOG.warn("Sending snapshot last zxid of peer is " + + Long.toHexString(peerLastZxid) + " " + " zxid of leader is " + + Long.toHexString(leaderLastZxid)); + // Dump data to follower + leader.zk.snapshot(oa); + oa.writeString("BenWasHere", "signature"); + } + bufferedOutput.flush(); + // + // Mutation packets will be queued during the serialize, + // so we need to mark when the follower can actually start + // using the data + // + queuedPackets + .add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); + + // Start sending packets + new Thread() { + public void run() { + Thread.currentThread().setName( + "Sender-" + s.getRemoteSocketAddress()); + try { + sendPackets(); + } catch (InterruptedException e) { + LOG.warn("Interrupted",e); + } + } + }.start(); + + while (true) { + qp = new QuorumPacket(); + ia.readRecord(qp, "packet"); + + long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; + if (qp.getType() == Leader.PING) { + traceMask = ZooTrace.SERVER_PING_TRACE_MASK; + } + ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); + tickOfLastAck = leader.self.tick; + + + ByteBuffer bb; + long sessionId; + int cxid; + int type; + + switch (qp.getType()) { + case Leader.ACK: + leader.processAck(qp.getZxid(), s.getLocalSocketAddress()); + break; + case Leader.PING: + // Process the touches + ByteArrayInputStream bis = new ByteArrayInputStream(qp + .getData()); + DataInputStream dis = new DataInputStream(bis); + while (dis.available() > 0) { + long sess = dis.readLong(); + int to = dis.readInt(); + leader.zk.touch(sess, to); + } + break; + case Leader.REVALIDATE: + bis = new ByteArrayInputStream(qp.getData()); + dis = new DataInputStream(bis); + long id = dis.readLong(); + int to = dis.readInt(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(id); + boolean valid = leader.zk.touch(id, to); + ZooTrace.logTraceMessage(LOG, + ZooTrace.SESSION_TRACE_MASK, + "Session " + Long.toHexString(id) + + " is valid: "+ valid); + dos.writeBoolean(valid); + qp.setData(bos.toByteArray()); + queuedPackets.add(qp); + break; + case Leader.REQUEST: + bb = ByteBuffer.wrap(qp.getData()); + sessionId = bb.getLong(); + cxid = bb.getInt(); + type = bb.getInt(); + bb = bb.slice(); + if(type == OpCode.sync){ + leader.setSyncHandler(this, sessionId); + } + leader.zk.submitRequest(null, sessionId, type, cxid, bb, + qp.getAuthinfo()); + break; + default: + } + } + } catch (IOException e) { + if (s != null && !s.isClosed()) { + LOG.error("FIXMSG",e); + } + } catch (InterruptedException e) { + LOG.error("FIXMSG",e); + } finally { + LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress() + + " ********"); + // Send the packet of death + try { + queuedPackets.put(proposalOfDeath); + } catch (InterruptedException e) { + LOG.error("FIXMSG",e); + } + shutdown(); + } + } + + public void shutdown() { + try { + if (s != null && !s.isClosed()) { + s.close(); + } + } catch (IOException e) { + LOG.error("FIXMSG",e); + } + leader.removeFollowerHandler(this); + } + + public long tickOfLastAck() { + return tickOfLastAck; + } + + /** + * ping calls from the leader to the followers + */ + public void ping() { + QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed, + null, null); + queuePacket(ping); + } + + void queuePacket(QuorumPacket p) { + queuedPackets.add(p); + } + + public boolean synced() { + return isAlive() + && tickOfLastAck >= leader.self.tick - leader.self.syncLimit; + } +}