Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 Tue Jun 24 12:04:58 2008
@@ -1,369 +1,369 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.txn.TxnHeader;
-
-/**
- * This class has the control logic for the Follower.
- */
-public class Follower {
-    private static final Logger LOG = Logger.getLogger(Follower.class);
-
-    QuorumPeer self;
-
-    FollowerZooKeeperServer zk;
-
-    Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
-        this.self = self;
-        this.zk=zk;
-    }
-
-    private InputArchive leaderIs;
-
-    private OutputArchive leaderOs;
-
-    private BufferedOutputStream bufferedOutput;
-
-    public Socket sock;
-
-    /**
-     * write a packet to the leader
-     *
-     * @param pp
-     *                the proposal packet to be sent to the leader
-     * @throws IOException
-     */
-    void writePacket(QuorumPacket pp) throws IOException {
-        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-        if (pp.getType() == Leader.PING) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp);
-        synchronized (leaderOs) {
-            leaderOs.writeRecord(pp, "packet");
-            bufferedOutput.flush();
-        }
-    }
-
-    /**
-     * read a packet from the leader
-     *
-     * @param pp
-     *                the packet to be instantiated
-     * @throws IOException
-     */
-    void readPacket(QuorumPacket pp) throws IOException {
-        synchronized (leaderIs) {
-            leaderIs.readRecord(pp, "packet");
-        }
-        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-        if (pp.getType() == Leader.PING) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
-    }
-
-    /**
-     * the main method called by the follower to follow the leader
-     *
-     * @throws InterruptedException
-     */
-    void followLeader() throws InterruptedException {
-        InetSocketAddress addr = null;
-        // Find the leader by id
-        for (QuorumServer s : self.quorumPeers) {
-            if (s.id == self.currentVote.id) {
-                addr = s.addr;
-                break;
-            }
-        }
-        if (addr == null) {
-            LOG.warn("Couldn't find the leader with id = "
-                    + self.currentVote.id);
-        }
-        LOG.info("Following " + addr);
-        sock = new Socket();
-        try {
-            QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
-            sock.setSoTimeout(self.tickTime * self.initLimit);
-            for (int tries = 0; tries < 5; tries++) {
-                try {
-                    //sock = new Socket();
-                    //sock.setSoTimeout(self.tickTime * self.initLimit);
-                    sock.connect(addr, self.tickTime * self.syncLimit);
-                    sock.setTcpNoDelay(true);
-                    break;
-                } catch (ConnectException e) {
-                    if (tries == 4) {
-                        LOG.error("Unexpected exception",e);
-                        throw e;
-                    } else {
-                        LOG.warn("Unexpected exception",e);
-                        sock = new Socket();
-                        sock.setSoTimeout(self.tickTime * self.initLimit);
-                    }
-                }
-                Thread.sleep(1000);
-            }
-            leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
-                    sock.getInputStream()));
-            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
-            leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
-            QuorumPacket qp = new QuorumPacket();
-            qp.setType(Leader.LASTZXID);
-            long sentLastZxid = self.getLastLoggedZxid();
-            qp.setZxid(sentLastZxid);
-            writePacket(qp);
-            readPacket(qp);
-            long newLeaderZxid = qp.getZxid();
-
-            if (qp.getType() != Leader.NEWLEADER) {
-                LOG.error("First packet should have been NEWLEADER");
-                throw new IOException("First packet should have been 
NEWLEADER");
-            }
-            readPacket(qp);
-            synchronized (zk) {
-                if (qp.getType() == Leader.DIFF) {
-                    LOG.info("Getting a diff from the leader!");
-                    zk.loadData();
-                }
-                else if (qp.getType() == Leader.SNAP) {
-                    LOG.info("Getting a snapshot from leader");
-                    // The leader is going to dump the database
-                    zk.loadData(leaderIs);
-                    String signature = leaderIs.readString("signature");
-                    if (!signature.equals("BenWasHere")) {
-                        LOG.error("Missing signature. Got " + signature);
-                        throw new IOException("Missing signature");
-                    }
-                } else if (qp.getType() == Leader.TRUNC) {
-                    //we need to truncate the log to the lastzxid of the leader
-                    LOG.warn("Truncating log to get in sync with the leader "
-                            + Long.toHexString(qp.getZxid()));
-                    zk.truncateLog(qp.getZxid());
-                    zk.loadData();
-                }
-                else {
-                    LOG.error("Got unexpected packet from leader "
-                            + qp.getType() + " exiting ... " );
-                    System.exit(13);
-                }
-                zk.dataTree.lastProcessedZxid = newLeaderZxid;
-            }
-            ack.setZxid(newLeaderZxid & ~0xffffffffL);
-            writePacket(ack);
-            sock.setSoTimeout(self.tickTime * self.syncLimit);
-            zk.startup();
-            while (self.running) {
-                readPacket(qp);
-                switch (qp.getType()) {
-                case Leader.PING:
-                    // Send back the ping with our session data
-                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                    DataOutputStream dos = new DataOutputStream(bos);
-                    HashMap<Long, Integer> touchTable = 
((FollowerZooKeeperServer) zk)
-                            .getTouchSnapshot();
-                    for (Entry<Long, Integer> entry : touchTable.entrySet()) {
-                        dos.writeLong(entry.getKey());
-                        dos.writeInt(entry.getValue());
-                    }
-                    qp.setData(bos.toByteArray());
-                    writePacket(qp);
-                    break;
-                case Leader.PROPOSAL:
-                    TxnHeader hdr = new TxnHeader();
-                    BinaryInputArchive ia = BinaryInputArchive
-                            .getArchive(new 
ByteArrayInputStream(qp.getData()));
-                    Record txn = ZooKeeperServer.deserializeTxn(ia, hdr);
-                    if (hdr.getZxid() != lastQueued + 1) {
-                        LOG.warn("Got zxid "
-                                + Long.toHexString(hdr.getZxid())
-                                + " expected "
-                                + Long.toHexString(lastQueued + 1));
-                    }
-                    lastQueued = hdr.getZxid();
-                    zk.logRequest(hdr, txn);
-                    break;
-                case Leader.COMMIT:
-                    zk.commit(qp.getZxid());
-                    break;
-                case Leader.UPTODATE:
-                    zk.snapshot();
-                    self.cnxnFactory.setZooKeeperServer(zk);
-                    break;
-                case Leader.REVALIDATE:
-                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
-                            .getData());
-                    DataInputStream dis = new DataInputStream(bis);
-                    long sessionId = dis.readLong();
-                    boolean valid = dis.readBoolean();
-                    synchronized (pendingRevalidations) {
-                        ServerCnxn cnxn = pendingRevalidations
-                                .remove(sessionId);
-                        if (cnxn == null) {
-                            LOG.warn("Missing "
-                                    + Long.toHexString(sessionId)
-                                    + " for validation");
-                        } else {
-                            cnxn.finishSessionInit(valid);
-                        }
-                    }
-                    ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                                             "Session " + sessionId
-                                             + " is valid: " + valid);
-                    break;
-                case Leader.SYNC:
-                    zk.sync();
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-            try {
-                sock.close();
-            } catch (IOException e1) {
-                e1.printStackTrace();
-            }
-
-            synchronized (pendingRevalidations) {
-                // clear pending revalitions
-                pendingRevalidations.clear();
-                pendingRevalidations.notifyAll();
-            }
-        }
-    }
-
-    private long lastQueued;
-
-    ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new 
ConcurrentHashMap<Long, ServerCnxn>();
-
-    /**
-     * validate a seesion for a client
-     *
-     * @param clientId
-     *                the client to be revailidated
-     * @param timeout
-     *                the timeout for which the session is valid
-     * @return
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
-            throws IOException, InterruptedException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        dos.writeLong(clientId);
-        dos.writeInt(timeout);
-        dos.close();
-        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
-                .toByteArray(), null);
-        pendingRevalidations.put(clientId, cnxn);
-        ZooTrace.logTraceMessage(LOG,
-                                 ZooTrace.SESSION_TRACE_MASK,
-                                 "To validate session "
-                                 + Long.toHexString(clientId));
-        writePacket(qp);
-    }
-
-    /**
-     * send a request packet to the leader
-     *
-     * @param request
-     *                the request from the client
-     * @throws IOException
-     */
-    void request(Request request) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream oa = new DataOutputStream(baos);
-        oa.writeLong(request.sessionId);
-        oa.writeInt(request.cxid);
-        oa.writeInt(request.type);
-        if (request.request != null) {
-            request.request.rewind();
-            int len = request.request.remaining();
-            byte b[] = new byte[len];
-            request.request.get(b);
-            request.request.rewind();
-            oa.write(b);
-        }
-        oa.close();
-        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
-                .toByteArray(), request.authInfo);
-//        QuorumPacket qp;
-//        if(request.type == OpCode.sync){
-//            qp = new QuorumPacket(Leader.SYNC, -1, baos
-//                    .toByteArray(), request.authInfo);
-//        }
-//        else{
-//        qp = new QuorumPacket(Leader.REQUEST, -1, baos
-//                .toByteArray(), request.authInfo);
-//        }
-        writePacket(qp);
-    }
-
-    public long getZxid() {
-        try {
-            synchronized (zk) {
-                return zk.getZxid();
-            }
-        } catch (NullPointerException e) {
-        }
-        return -1;
-    }
-
-    public void shutdown() {
-        // set the zookeeper server to null
-        self.cnxnFactory.setZooKeeperServer(null);
-        // clear all the connections
-        self.cnxnFactory.clear();
-        // shutdown previous zookeeper
-        if (zk != null) {
-            zk.shutdown();
-
-        }
-        LOG.error("FIXMSG",new Exception("shutdown Follower"));
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This class has the control logic for the Follower.
+ */
+public class Follower {
+    private static final Logger LOG = Logger.getLogger(Follower.class);
+
+    QuorumPeer self;
+
+    FollowerZooKeeperServer zk;
+
+    Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
+        this.self = self;
+        this.zk=zk;
+    }
+
+    private InputArchive leaderIs;
+
+    private OutputArchive leaderOs;
+
+    private BufferedOutputStream bufferedOutput;
+
+    public Socket sock;
+
+    /**
+     * write a packet to the leader
+     *
+     * @param pp
+     *                the proposal packet to be sent to the leader
+     * @throws IOException
+     */
+    void writePacket(QuorumPacket pp) throws IOException {
+        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+        if (pp.getType() == Leader.PING) {
+            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+        }
+        ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp);
+        synchronized (leaderOs) {
+            leaderOs.writeRecord(pp, "packet");
+            bufferedOutput.flush();
+        }
+    }
+
+    /**
+     * read a packet from the leader
+     *
+     * @param pp
+     *                the packet to be instantiated
+     * @throws IOException
+     */
+    void readPacket(QuorumPacket pp) throws IOException {
+        synchronized (leaderIs) {
+            leaderIs.readRecord(pp, "packet");
+        }
+        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+        if (pp.getType() == Leader.PING) {
+            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+        }
+        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
+    }
+
+    /**
+     * the main method called by the follower to follow the leader
+     *
+     * @throws InterruptedException
+     */
+    void followLeader() throws InterruptedException {
+        InetSocketAddress addr = null;
+        // Find the leader by id
+        for (QuorumServer s : self.quorumPeers) {
+            if (s.id == self.currentVote.id) {
+                addr = s.addr;
+                break;
+            }
+        }
+        if (addr == null) {
+            LOG.warn("Couldn't find the leader with id = "
+                    + self.currentVote.id);
+        }
+        LOG.info("Following " + addr);
+        sock = new Socket();
+        try {
+            QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+            sock.setSoTimeout(self.tickTime * self.initLimit);
+            for (int tries = 0; tries < 5; tries++) {
+                try {
+                    //sock = new Socket();
+                    //sock.setSoTimeout(self.tickTime * self.initLimit);
+                    sock.connect(addr, self.tickTime * self.syncLimit);
+                    sock.setTcpNoDelay(true);
+                    break;
+                } catch (ConnectException e) {
+                    if (tries == 4) {
+                        LOG.error("Unexpected exception",e);
+                        throw e;
+                    } else {
+                        LOG.warn("Unexpected exception",e);
+                        sock = new Socket();
+                        sock.setSoTimeout(self.tickTime * self.initLimit);
+                    }
+                }
+                Thread.sleep(1000);
+            }
+            leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
+                    sock.getInputStream()));
+            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+            leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+            QuorumPacket qp = new QuorumPacket();
+            qp.setType(Leader.LASTZXID);
+            long sentLastZxid = self.getLastLoggedZxid();
+            qp.setZxid(sentLastZxid);
+            writePacket(qp);
+            readPacket(qp);
+            long newLeaderZxid = qp.getZxid();
+
+            if (qp.getType() != Leader.NEWLEADER) {
+                LOG.error("First packet should have been NEWLEADER");
+                throw new IOException("First packet should have been 
NEWLEADER");
+            }
+            readPacket(qp);
+            synchronized (zk) {
+                if (qp.getType() == Leader.DIFF) {
+                    LOG.info("Getting a diff from the leader!");
+                    zk.loadData();
+                }
+                else if (qp.getType() == Leader.SNAP) {
+                    LOG.info("Getting a snapshot from leader");
+                    // The leader is going to dump the database
+                    zk.loadData(leaderIs);
+                    String signature = leaderIs.readString("signature");
+                    if (!signature.equals("BenWasHere")) {
+                        LOG.error("Missing signature. Got " + signature);
+                        throw new IOException("Missing signature");
+                    }
+                } else if (qp.getType() == Leader.TRUNC) {
+                    //we need to truncate the log to the lastzxid of the leader
+                    LOG.warn("Truncating log to get in sync with the leader "
+                            + Long.toHexString(qp.getZxid()));
+                    zk.truncateLog(qp.getZxid());
+                    zk.loadData();
+                }
+                else {
+                    LOG.error("Got unexpected packet from leader "
+                            + qp.getType() + " exiting ... " );
+                    System.exit(13);
+                }
+                zk.dataTree.lastProcessedZxid = newLeaderZxid;
+            }
+            ack.setZxid(newLeaderZxid & ~0xffffffffL);
+            writePacket(ack);
+            sock.setSoTimeout(self.tickTime * self.syncLimit);
+            zk.startup();
+            while (self.running) {
+                readPacket(qp);
+                switch (qp.getType()) {
+                case Leader.PING:
+                    // Send back the ping with our session data
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    DataOutputStream dos = new DataOutputStream(bos);
+                    HashMap<Long, Integer> touchTable = 
((FollowerZooKeeperServer) zk)
+                            .getTouchSnapshot();
+                    for (Entry<Long, Integer> entry : touchTable.entrySet()) {
+                        dos.writeLong(entry.getKey());
+                        dos.writeInt(entry.getValue());
+                    }
+                    qp.setData(bos.toByteArray());
+                    writePacket(qp);
+                    break;
+                case Leader.PROPOSAL:
+                    TxnHeader hdr = new TxnHeader();
+                    BinaryInputArchive ia = BinaryInputArchive
+                            .getArchive(new 
ByteArrayInputStream(qp.getData()));
+                    Record txn = ZooKeeperServer.deserializeTxn(ia, hdr);
+                    if (hdr.getZxid() != lastQueued + 1) {
+                        LOG.warn("Got zxid "
+                                + Long.toHexString(hdr.getZxid())
+                                + " expected "
+                                + Long.toHexString(lastQueued + 1));
+                    }
+                    lastQueued = hdr.getZxid();
+                    zk.logRequest(hdr, txn);
+                    break;
+                case Leader.COMMIT:
+                    zk.commit(qp.getZxid());
+                    break;
+                case Leader.UPTODATE:
+                    zk.snapshot();
+                    self.cnxnFactory.setZooKeeperServer(zk);
+                    break;
+                case Leader.REVALIDATE:
+                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
+                            .getData());
+                    DataInputStream dis = new DataInputStream(bis);
+                    long sessionId = dis.readLong();
+                    boolean valid = dis.readBoolean();
+                    synchronized (pendingRevalidations) {
+                        ServerCnxn cnxn = pendingRevalidations
+                                .remove(sessionId);
+                        if (cnxn == null) {
+                            LOG.warn("Missing "
+                                    + Long.toHexString(sessionId)
+                                    + " for validation");
+                        } else {
+                            cnxn.finishSessionInit(valid);
+                        }
+                    }
+                    ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+                                             "Session " + sessionId
+                                             + " is valid: " + valid);
+                    break;
+                case Leader.SYNC:
+                    zk.sync();
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+            try {
+                sock.close();
+            } catch (IOException e1) {
+                e1.printStackTrace();
+            }
+
+            synchronized (pendingRevalidations) {
+                // clear pending revalitions
+                pendingRevalidations.clear();
+                pendingRevalidations.notifyAll();
+            }
+        }
+    }
+
+    private long lastQueued;
+
+    ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new 
ConcurrentHashMap<Long, ServerCnxn>();
+
+    /**
+     * validate a seesion for a client
+     *
+     * @param clientId
+     *                the client to be revailidated
+     * @param timeout
+     *                the timeout for which the session is valid
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
+            throws IOException, InterruptedException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.writeLong(clientId);
+        dos.writeInt(timeout);
+        dos.close();
+        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
+                .toByteArray(), null);
+        pendingRevalidations.put(clientId, cnxn);
+        ZooTrace.logTraceMessage(LOG,
+                                 ZooTrace.SESSION_TRACE_MASK,
+                                 "To validate session "
+                                 + Long.toHexString(clientId));
+        writePacket(qp);
+    }
+
+    /**
+     * send a request packet to the leader
+     *
+     * @param request
+     *                the request from the client
+     * @throws IOException
+     */
+    void request(Request request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream oa = new DataOutputStream(baos);
+        oa.writeLong(request.sessionId);
+        oa.writeInt(request.cxid);
+        oa.writeInt(request.type);
+        if (request.request != null) {
+            request.request.rewind();
+            int len = request.request.remaining();
+            byte b[] = new byte[len];
+            request.request.get(b);
+            request.request.rewind();
+            oa.write(b);
+        }
+        oa.close();
+        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
+                .toByteArray(), request.authInfo);
+//        QuorumPacket qp;
+//        if(request.type == OpCode.sync){
+//            qp = new QuorumPacket(Leader.SYNC, -1, baos
+//                    .toByteArray(), request.authInfo);
+//        }
+//        else{
+//        qp = new QuorumPacket(Leader.REQUEST, -1, baos
+//                .toByteArray(), request.authInfo);
+//        }
+        writePacket(qp);
+    }
+
+    public long getZxid() {
+        try {
+            synchronized (zk) {
+                return zk.getZxid();
+            }
+        } catch (NullPointerException e) {
+        }
+        return -1;
+    }
+
+    public void shutdown() {
+        // set the zookeeper server to null
+        self.cnxnFactory.setZooKeeperServer(null);
+        // clear all the connections
+        self.cnxnFactory.clear();
+        // shutdown previous zookeeper
+        if (zk != null) {
+            zk.shutdown();
+
+        }
+        LOG.error("FIXMSG",new Exception("shutdown Follower"));
+    }
+}

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
 Tue Jun 24 12:04:58 2008
@@ -1,391 +1,391 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.txn.TxnHeader;
-
-/**
- * There will be an instance of this class created by the Leader for each
- * follower.All communication for a given Follower will be handled by this
- * class.
- */
-public class FollowerHandler extends Thread {
-    private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
-
-    public Socket s;
-
-    Leader leader;
-
-    long tickOfLastAck;
-
-    /**
-     * The packets to be sent to the follower
-     */
-    LinkedBlockingQueue<QuorumPacket> queuedPackets = new 
LinkedBlockingQueue<QuorumPacket>();
-
-    private BinaryInputArchive ia;
-
-    private BinaryOutputArchive oa;
-
-    private BufferedOutputStream bufferedOutput;
-
-    FollowerHandler(Socket s, Leader leader) throws IOException {
-        super("FollowerHandler-" + s.getRemoteSocketAddress());
-        this.s = s;
-        this.leader = leader;
-        leader.addFollowerHandler(this);
-        start();
-    }
-
-    /**
-     * If this packet is queued, the sender thread will exit
-     */
-    QuorumPacket proposalOfDeath = new QuorumPacket();
-
-    /**
-     * This method will use the thread to send packets added to the
-     * queuedPackets list
-     *
-     * @throws InterruptedException
-     */
-    private void sendPackets() throws InterruptedException {
-        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-        while (true) {
-            QuorumPacket p;
-            p = queuedPackets.take();
-
-            if (p == proposalOfDeath) {
-                // Packet of death!
-                break;
-            }
-            if (p.getType() == Leader.PING) {
-                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-            }
-            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
-            try {
-                oa.writeRecord(p, "packet");
-                bufferedOutput.flush();
-            } catch (IOException e) {
-                if (!s.isClosed()) {
-                    LOG.warn("Unexpected exception",e);
-                }
-                break;
-            }
-        }
-    }
-
-    static public String packetToString(QuorumPacket p) {
-        if (true)
-            return null;
-        String type = null;
-        String mess = null;
-        Record txn = null;
-        switch (p.getType()) {
-        case Leader.ACK:
-            type = "ACK";
-            break;
-        case Leader.COMMIT:
-            type = "COMMIT";
-            break;
-        case Leader.LASTZXID:
-            type = "LASTZXID";
-            break;
-        case Leader.NEWLEADER:
-            type = "NEWLEADER";
-            break;
-        case Leader.PING:
-            type = "PING";
-            break;
-        case Leader.PROPOSAL:
-            type = "PROPOSAL";
-            BinaryInputArchive ia = BinaryInputArchive
-                    .getArchive(new ByteArrayInputStream(p.getData()));
-            TxnHeader hdr = new TxnHeader();
-            try {
-                txn = ZooKeeperServer.deserializeTxn(ia, hdr);
-                // mess = "transaction: " + txn.toString();
-            } catch (IOException e) {
-                LOG.warn("Unexpected exception",e);
-            }
-            break;
-        case Leader.REQUEST:
-            type = "REQUEST";
-            break;
-        case Leader.REVALIDATE:
-            type = "REVALIDATE";
-            ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
-            DataInputStream dis = new DataInputStream(bis);
-            try {
-                long id = dis.readLong();
-                mess = " sessionid = " + id;
-            } catch (IOException e) {
-                LOG.warn("Unexpected exception", e);
-            }
-
-            break;
-        case Leader.UPTODATE:
-            type = "UPTODATE";
-            break;
-        default:
-            type = "UNKNOWN" + p.getType();
-        }
-        String entry = null;
-        if (type != null) {
-            entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
-        }
-        return entry;
-    }
-
-    /**
-     * This thread will receive packets from the follower and process them and
-     * also listen to new connections from new followers.
-     */
-    public void run() {
-        try {
-
-            ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
-                    .getInputStream()));
-            bufferedOutput = new BufferedOutputStream(s.getOutputStream());
-            oa = BinaryOutputArchive.getArchive(bufferedOutput);
-
-            QuorumPacket qp = new QuorumPacket();
-            ia.readRecord(qp, "packet");
-            if (qp.getType() != Leader.LASTZXID) {
-                LOG.error("First packet " + qp.toString()
-                        + " is not LASTZXID!");
-                return;
-            }
-            long peerLastZxid = qp.getZxid();
-            int packetToSend = Leader.SNAP;
-            boolean logTxns = true;
-
-            long zxidToSend = 0;
-            // we are sending the diff
-            synchronized(leader.zk.committedLog) {
-                if (leader.zk.committedLog.size() != 0) {
-                    if ((leader.zk.maxCommittedLog >= peerLastZxid)
-                            && (leader.zk.minCommittedLog <= peerLastZxid)) {
-                        packetToSend = Leader.DIFF;
-                        zxidToSend = leader.zk.maxCommittedLog;
-                        for (Proposal propose: leader.zk.committedLog) {
-                            if (propose.packet.getZxid() > peerLastZxid) {
-                                queuePacket(propose.packet);
-                                QuorumPacket qcommit = new 
QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
-                                        null, null);
-                                queuePacket(qcommit);
-
-                            }
-                        }
-                    }
-                }
-                else {
-                    logTxns = false;
-                }            }
-            long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
-            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                    leaderLastZxid, null, null);
-            oa.writeRecord(newLeaderQP, "packet");
-            bufferedOutput.flush();
-            // a special case when both the ids are the same
-            if (peerLastZxid == leaderLastZxid) {
-                packetToSend = Leader.DIFF;
-                zxidToSend = leaderLastZxid;
-            }
-            //check if we decided to send a diff or we need to send a truncate
-            // we avoid using epochs for truncating because epochs make things
-            // complicated. Two epochs might have the last 32 bits as same.
-            // only if we know that there is a committed zxid in the queue that
-            // is less than the one the peer has we send a trunc else to make
-            // things simple we just send sanpshot.
-            if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
-                // this is the only case that we are sure that
-                // we can ask the follower to truncate the log
-                packetToSend = Leader.TRUNC;
-                zxidToSend = leader.zk.maxCommittedLog;
-
-            }
-            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, 
null), "packet");
-            bufferedOutput.flush();
-            // only if we are not truncating or fast sycning
-            if (packetToSend == Leader.SNAP) {
-                LOG.warn("Sending snapshot last zxid of peer is "
-                        + Long.toHexString(peerLastZxid) + " " + " zxid of 
leader is "
-                        + Long.toHexString(leaderLastZxid));
-                // Dump data to follower
-                leader.zk.snapshot(oa);
-                oa.writeString("BenWasHere", "signature");
-            }
-            bufferedOutput.flush();
-            //
-            // Mutation packets will be queued during the serialize,
-            // so we need to mark when the follower can actually start
-            // using the data
-            //
-            queuedPackets
-                    .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
-
-            // Start sending packets
-            new Thread() {
-                public void run() {
-                    Thread.currentThread().setName(
-                            "Sender-" + s.getRemoteSocketAddress());
-                    try {
-                        sendPackets();
-                    } catch (InterruptedException e) {
-                        LOG.warn("Interrupted",e);
-                    }
-                }
-            }.start();
-
-            while (true) {
-                qp = new QuorumPacket();
-                ia.readRecord(qp, "packet");
-
-                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-                if (qp.getType() == Leader.PING) {
-                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-                }
-                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
-                tickOfLastAck = leader.self.tick;
-
-
-                ByteBuffer bb;
-                long sessionId;
-                int cxid;
-                int type;
-
-                switch (qp.getType()) {
-                case Leader.ACK:
-                    leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
-                    break;
-                case Leader.PING:
-                    // Process the touches
-                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
-                            .getData());
-                    DataInputStream dis = new DataInputStream(bis);
-                    while (dis.available() > 0) {
-                        long sess = dis.readLong();
-                        int to = dis.readInt();
-                        leader.zk.touch(sess, to);
-                    }
-                    break;
-                case Leader.REVALIDATE:
-                    bis = new ByteArrayInputStream(qp.getData());
-                    dis = new DataInputStream(bis);
-                    long id = dis.readLong();
-                    int to = dis.readInt();
-                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                    DataOutputStream dos = new DataOutputStream(bos);
-                    dos.writeLong(id);
-                    boolean valid = leader.zk.touch(id, to);
-                    ZooTrace.logTraceMessage(LOG,
-                                             ZooTrace.SESSION_TRACE_MASK,
-                                             "Session " + Long.toHexString(id)
-                                             + " is valid: "+ valid);
-                    dos.writeBoolean(valid);
-                    qp.setData(bos.toByteArray());
-                    queuedPackets.add(qp);
-                    break;
-                case Leader.REQUEST:
-                    bb = ByteBuffer.wrap(qp.getData());
-                    sessionId = bb.getLong();
-                    cxid = bb.getInt();
-                    type = bb.getInt();
-                    bb = bb.slice();
-                    if(type == OpCode.sync){
-                        leader.setSyncHandler(this, sessionId);
-                    }
-                    leader.zk.submitRequest(null, sessionId, type, cxid, bb,
-                            qp.getAuthinfo());
-                    break;
-                default:
-                }
-            }
-        } catch (IOException e) {
-            if (s != null && !s.isClosed()) {
-                LOG.error("FIXMSG",e);
-            }
-        } catch (InterruptedException e) {
-            LOG.error("FIXMSG",e);
-        } finally {
-            LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
-                    + " ********");
-            // Send the packet of death
-            try {
-                queuedPackets.put(proposalOfDeath);
-            } catch (InterruptedException e) {
-                LOG.error("FIXMSG",e);
-            }
-            shutdown();
-        }
-    }
-
-    public void shutdown() {
-        try {
-            if (s != null && !s.isClosed()) {
-                s.close();
-            }
-        } catch (IOException e) {
-            LOG.error("FIXMSG",e);
-        }
-        leader.removeFollowerHandler(this);
-    }
-
-    public long tickOfLastAck() {
-        return tickOfLastAck;
-    }
-
-    /**
-     * ping calls from the leader to the followers
-     */
-    public void ping() {
-        QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed,
-                null, null);
-        queuePacket(ping);
-    }
-
-    void queuePacket(QuorumPacket p) {
-        queuedPackets.add(p);
-    }
-
-    public boolean synced() {
-        return isAlive()
-                && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * There will be an instance of this class created by the Leader for each
+ * follower.All communication for a given Follower will be handled by this
+ * class.
+ */
+public class FollowerHandler extends Thread {
+    private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
+
+    public Socket s;
+
+    Leader leader;
+
+    long tickOfLastAck;
+
+    /**
+     * The packets to be sent to the follower
+     */
+    LinkedBlockingQueue<QuorumPacket> queuedPackets = new 
LinkedBlockingQueue<QuorumPacket>();
+
+    private BinaryInputArchive ia;
+
+    private BinaryOutputArchive oa;
+
+    private BufferedOutputStream bufferedOutput;
+
+    FollowerHandler(Socket s, Leader leader) throws IOException {
+        super("FollowerHandler-" + s.getRemoteSocketAddress());
+        this.s = s;
+        this.leader = leader;
+        leader.addFollowerHandler(this);
+        start();
+    }
+
+    /**
+     * If this packet is queued, the sender thread will exit
+     */
+    QuorumPacket proposalOfDeath = new QuorumPacket();
+
+    /**
+     * This method will use the thread to send packets added to the
+     * queuedPackets list
+     *
+     * @throws InterruptedException
+     */
+    private void sendPackets() throws InterruptedException {
+        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+        while (true) {
+            QuorumPacket p;
+            p = queuedPackets.take();
+
+            if (p == proposalOfDeath) {
+                // Packet of death!
+                break;
+            }
+            if (p.getType() == Leader.PING) {
+                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+            }
+            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
+            try {
+                oa.writeRecord(p, "packet");
+                bufferedOutput.flush();
+            } catch (IOException e) {
+                if (!s.isClosed()) {
+                    LOG.warn("Unexpected exception",e);
+                }
+                break;
+            }
+        }
+    }
+
+    static public String packetToString(QuorumPacket p) {
+        if (true)
+            return null;
+        String type = null;
+        String mess = null;
+        Record txn = null;
+        switch (p.getType()) {
+        case Leader.ACK:
+            type = "ACK";
+            break;
+        case Leader.COMMIT:
+            type = "COMMIT";
+            break;
+        case Leader.LASTZXID:
+            type = "LASTZXID";
+            break;
+        case Leader.NEWLEADER:
+            type = "NEWLEADER";
+            break;
+        case Leader.PING:
+            type = "PING";
+            break;
+        case Leader.PROPOSAL:
+            type = "PROPOSAL";
+            BinaryInputArchive ia = BinaryInputArchive
+                    .getArchive(new ByteArrayInputStream(p.getData()));
+            TxnHeader hdr = new TxnHeader();
+            try {
+                txn = ZooKeeperServer.deserializeTxn(ia, hdr);
+                // mess = "transaction: " + txn.toString();
+            } catch (IOException e) {
+                LOG.warn("Unexpected exception",e);
+            }
+            break;
+        case Leader.REQUEST:
+            type = "REQUEST";
+            break;
+        case Leader.REVALIDATE:
+            type = "REVALIDATE";
+            ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
+            DataInputStream dis = new DataInputStream(bis);
+            try {
+                long id = dis.readLong();
+                mess = " sessionid = " + id;
+            } catch (IOException e) {
+                LOG.warn("Unexpected exception", e);
+            }
+
+            break;
+        case Leader.UPTODATE:
+            type = "UPTODATE";
+            break;
+        default:
+            type = "UNKNOWN" + p.getType();
+        }
+        String entry = null;
+        if (type != null) {
+            entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
+        }
+        return entry;
+    }
+
+    /**
+     * This thread will receive packets from the follower and process them and
+     * also listen to new connections from new followers.
+     */
+    public void run() {
+        try {
+
+            ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
+                    .getInputStream()));
+            bufferedOutput = new BufferedOutputStream(s.getOutputStream());
+            oa = BinaryOutputArchive.getArchive(bufferedOutput);
+
+            QuorumPacket qp = new QuorumPacket();
+            ia.readRecord(qp, "packet");
+            if (qp.getType() != Leader.LASTZXID) {
+                LOG.error("First packet " + qp.toString()
+                        + " is not LASTZXID!");
+                return;
+            }
+            long peerLastZxid = qp.getZxid();
+            int packetToSend = Leader.SNAP;
+            boolean logTxns = true;
+
+            long zxidToSend = 0;
+            // we are sending the diff
+            synchronized(leader.zk.committedLog) {
+                if (leader.zk.committedLog.size() != 0) {
+                    if ((leader.zk.maxCommittedLog >= peerLastZxid)
+                            && (leader.zk.minCommittedLog <= peerLastZxid)) {
+                        packetToSend = Leader.DIFF;
+                        zxidToSend = leader.zk.maxCommittedLog;
+                        for (Proposal propose: leader.zk.committedLog) {
+                            if (propose.packet.getZxid() > peerLastZxid) {
+                                queuePacket(propose.packet);
+                                QuorumPacket qcommit = new 
QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
+                                        null, null);
+                                queuePacket(qcommit);
+
+                            }
+                        }
+                    }
+                }
+                else {
+                    logTxns = false;
+                }            }
+            long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
+            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                    leaderLastZxid, null, null);
+            oa.writeRecord(newLeaderQP, "packet");
+            bufferedOutput.flush();
+            // a special case when both the ids are the same
+            if (peerLastZxid == leaderLastZxid) {
+                packetToSend = Leader.DIFF;
+                zxidToSend = leaderLastZxid;
+            }
+            //check if we decided to send a diff or we need to send a truncate
+            // we avoid using epochs for truncating because epochs make things
+            // complicated. Two epochs might have the last 32 bits as same.
+            // only if we know that there is a committed zxid in the queue that
+            // is less than the one the peer has we send a trunc else to make
+            // things simple we just send sanpshot.
+            if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
+                // this is the only case that we are sure that
+                // we can ask the follower to truncate the log
+                packetToSend = Leader.TRUNC;
+                zxidToSend = leader.zk.maxCommittedLog;
+
+            }
+            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, 
null), "packet");
+            bufferedOutput.flush();
+            // only if we are not truncating or fast sycning
+            if (packetToSend == Leader.SNAP) {
+                LOG.warn("Sending snapshot last zxid of peer is "
+                        + Long.toHexString(peerLastZxid) + " " + " zxid of 
leader is "
+                        + Long.toHexString(leaderLastZxid));
+                // Dump data to follower
+                leader.zk.snapshot(oa);
+                oa.writeString("BenWasHere", "signature");
+            }
+            bufferedOutput.flush();
+            //
+            // Mutation packets will be queued during the serialize,
+            // so we need to mark when the follower can actually start
+            // using the data
+            //
+            queuedPackets
+                    .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
+
+            // Start sending packets
+            new Thread() {
+                public void run() {
+                    Thread.currentThread().setName(
+                            "Sender-" + s.getRemoteSocketAddress());
+                    try {
+                        sendPackets();
+                    } catch (InterruptedException e) {
+                        LOG.warn("Interrupted",e);
+                    }
+                }
+            }.start();
+
+            while (true) {
+                qp = new QuorumPacket();
+                ia.readRecord(qp, "packet");
+
+                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+                if (qp.getType() == Leader.PING) {
+                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+                }
+                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
+                tickOfLastAck = leader.self.tick;
+
+
+                ByteBuffer bb;
+                long sessionId;
+                int cxid;
+                int type;
+
+                switch (qp.getType()) {
+                case Leader.ACK:
+                    leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
+                    break;
+                case Leader.PING:
+                    // Process the touches
+                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
+                            .getData());
+                    DataInputStream dis = new DataInputStream(bis);
+                    while (dis.available() > 0) {
+                        long sess = dis.readLong();
+                        int to = dis.readInt();
+                        leader.zk.touch(sess, to);
+                    }
+                    break;
+                case Leader.REVALIDATE:
+                    bis = new ByteArrayInputStream(qp.getData());
+                    dis = new DataInputStream(bis);
+                    long id = dis.readLong();
+                    int to = dis.readInt();
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    DataOutputStream dos = new DataOutputStream(bos);
+                    dos.writeLong(id);
+                    boolean valid = leader.zk.touch(id, to);
+                    ZooTrace.logTraceMessage(LOG,
+                                             ZooTrace.SESSION_TRACE_MASK,
+                                             "Session " + Long.toHexString(id)
+                                             + " is valid: "+ valid);
+                    dos.writeBoolean(valid);
+                    qp.setData(bos.toByteArray());
+                    queuedPackets.add(qp);
+                    break;
+                case Leader.REQUEST:
+                    bb = ByteBuffer.wrap(qp.getData());
+                    sessionId = bb.getLong();
+                    cxid = bb.getInt();
+                    type = bb.getInt();
+                    bb = bb.slice();
+                    if(type == OpCode.sync){
+                        leader.setSyncHandler(this, sessionId);
+                    }
+                    leader.zk.submitRequest(null, sessionId, type, cxid, bb,
+                            qp.getAuthinfo());
+                    break;
+                default:
+                }
+            }
+        } catch (IOException e) {
+            if (s != null && !s.isClosed()) {
+                LOG.error("FIXMSG",e);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("FIXMSG",e);
+        } finally {
+            LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
+                    + " ********");
+            // Send the packet of death
+            try {
+                queuedPackets.put(proposalOfDeath);
+            } catch (InterruptedException e) {
+                LOG.error("FIXMSG",e);
+            }
+            shutdown();
+        }
+    }
+
+    public void shutdown() {
+        try {
+            if (s != null && !s.isClosed()) {
+                s.close();
+            }
+        } catch (IOException e) {
+            LOG.error("FIXMSG",e);
+        }
+        leader.removeFollowerHandler(this);
+    }
+
+    public long tickOfLastAck() {
+        return tickOfLastAck;
+    }
+
+    /**
+     * ping calls from the leader to the followers
+     */
+    public void ping() {
+        QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed,
+                null, null);
+        queuePacket(ping);
+    }
+
+    void queuePacket(QuorumPacket p) {
+        queuedPackets.add(p);
+    }
+
+    public boolean synced() {
+        return isAlive()
+                && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
+    }
+}


Reply via email to