Author: breed
Date: Sat Nov  7 07:16:30 2009
New Revision: 833639

URL: http://svn.apache.org/viewvc?rev=833639&view=rev
Log:
ZOOKEEPER-568. SyncRequestProcessor snapping too frequently - counts non-log 
events as log events


Modified:
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=833639&r1=833638&r2=833639&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
 Sat Nov  7 07:16:30 2009
@@ -34,42 +34,44 @@
  */
 public class SyncRequestProcessor extends Thread implements RequestProcessor {
     private static final Logger LOG = 
Logger.getLogger(SyncRequestProcessor.class);
-    private ZooKeeperServer zks;
-    private LinkedBlockingQueue<Request> queuedRequests = new 
LinkedBlockingQueue<Request>();
-    private RequestProcessor nextProcessor;
-    Thread snapInProcess = null;
-    
+    private final ZooKeeperServer zks;
+    private final LinkedBlockingQueue<Request> queuedRequests =
+        new LinkedBlockingQueue<Request>();
+    private final RequestProcessor nextProcessor;
+
+    private Thread snapInProcess = null;
+
     /**
      * Transactions that have been written and are waiting to be flushed to
      * disk. Basically this is the list of SyncItems whose callbacks will be
      * invoked after flush returns successfully.
      */
-    private LinkedList<Request> toFlush = new LinkedList<Request>();
-    private Random r = new Random(System.nanoTime());
-    private int logCount = 0;
+    private final LinkedList<Request> toFlush = new LinkedList<Request>();
+    private final Random r = new Random(System.nanoTime());
     /**
      * The number of log entries to log before starting a snapshot
      */
     private static int snapCount = ZooKeeperServer.getSnapCount();
 
-    private Request requestOfDeath = Request.requestOfDeath;
+    private final Request requestOfDeath = Request.requestOfDeath;
 
     public SyncRequestProcessor(ZooKeeperServer zks,
-            RequestProcessor nextProcessor) {
+            RequestProcessor nextProcessor)
+    {
         super("SyncThread:" + zks.getServerId());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
     }
-    
+
     /**
-     * used by tests to check for changing 
+     * used by tests to check for changing
      * snapcounts
      * @param count
      */
     public static void setSnapCount(int count) {
         snapCount = count;
     }
-    
+
     /**
      * used by tests to get the snapcount
      * @return the snapcount
@@ -77,10 +79,14 @@
     public static int getSnapCount() {
         return snapCount;
     }
-    
+
     @Override
     public void run() {
         try {
+            int logCount = 0;
+
+            // we do this in an attempt to ensure that not all of the servers
+            // in the ensemble take a snapshot at the same time
             int randRoll = r.nextInt(snapCount/2);
             while (true) {
                 Request si = null;
@@ -97,7 +103,8 @@
                     break;
                 }
                 if (si != null) {
-                    zks.getLogWriter().append(si);
+                    // track the number of records written to the log
+                    if (zks.getLogWriter().append(si)) {
                         logCount++;
                         if (logCount > (snapCount / 2 + randRoll)) {
                             randRoll = r.nextInt(snapCount/2);
@@ -106,21 +113,31 @@
                             // take a snapshot
                             if (snapInProcess != null && 
snapInProcess.isAlive()) {
                                 LOG.warn("Too busy to snap, skipping");
-                            }
-                            else {
+                            } else {
                                 snapInProcess = new Thread("Snapshot Thread") {
-                                    public void run() {
-                                     try {
-                                         zks.takeSnapshot();
-                                     } catch(Exception e) {
-                                         LOG.warn("Unexpected exception", e);
-                                     }
-                                    }
-                                };
+                                        public void run() {
+                                            try {
+                                                zks.takeSnapshot();
+                                            } catch(Exception e) {
+                                                LOG.warn("Unexpected 
exception", e);
+                                            }
+                                        }
+                                    };
                                 snapInProcess.start();
                             }
                             logCount = 0;
                         }
+                    } else if (toFlush.isEmpty()) {
+                        // optimization for read heavy workloads
+                        // iff this is a read, and there are no pending
+                        // flushes (writes), then just pass this to the next
+                        // processor
+                        nextProcessor.processRequest(si);
+                        if (nextProcessor instanceof Flushable) {
+                            ((Flushable)nextProcessor).flush();
+                        }
+                        continue;
+                    }
                     toFlush.add(si);
                     if (toFlush.size() > 1000) {
                         flush(toFlush);
@@ -135,11 +152,11 @@
     }
 
     private void flush(LinkedList<Request> toFlush) throws IOException {
-        if (toFlush.size() == 0)
+        if (toFlush.isEmpty())
             return;
 
         zks.getLogWriter().commit();
-        while (toFlush.size() > 0) {
+        while (!toFlush.isEmpty()) {
             Request i = toFlush.remove();
             nextProcessor.processRequest(i);
         }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=833639&r1=833638&r2=833639&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
 Sat Nov  7 07:16:30 2009
@@ -131,9 +131,11 @@
      * append an entry to the transaction log
      * @param hdr the header of the transaction
      * @param txn the transaction part of the entry
+     * returns true iff something appended, otw false 
      */
-    public synchronized void append(TxnHeader hdr, Record txn)
-        throws IOException {
+    public synchronized boolean append(TxnHeader hdr, Record txn)
+        throws IOException
+    {
         if (hdr != null) {
             if (hdr.getZxid() <= lastZxidSeen) {
                 LOG.warn("Current zxid " + hdr.getZxid()
@@ -161,7 +163,10 @@
             crc.update(buf, 0, buf.length);
             oa.writeLong(crc.getValue(), "txnEntryCRC");
             Util.writeTxnBytes(oa, buf);
+            
+            return true;
         }
+        return false;
     }
 
     /**

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=833639&r1=833638&r2=833639&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
 Sat Nov  7 07:16:30 2009
@@ -262,10 +262,11 @@
     /**
      * append the request to the transaction logs
      * @param si the request to be appended
+     * returns true iff something appended, otw false 
      * @throws IOException
      */
-    public void append(Request si) throws IOException {
-        txnLog.append(si.hdr, si.txn);
+    public boolean append(Request si) throws IOException {
+        return txnLog.append(si.hdr, si.txn);
     }
 
     /**

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java?rev=833639&r1=833638&r2=833639&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
 Sat Nov  7 07:16:30 2009
@@ -39,9 +39,10 @@
      * Append a request to the transaction log
      * @param hdr the transaction header
      * @param r the transaction itself
+     * returns true iff something appended, otw false 
      * @throws IOException
      */
-    void append(TxnHeader hdr, Record r) throws IOException;
+    boolean append(TxnHeader hdr, Record r) throws IOException;
 
     /**
      * Start reading the transaction logs


Reply via email to