[ 
https://issues.apache.org/jira/browse/ACCUMULO-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13860267#comment-13860267
 ] 

Keith Turner commented on ACCUMULO-2027:
----------------------------------------

Synchronizing methods on ZookeeperInstance will not fix all race conditions.  
The general problem is accessing ZooCache or ThriftTransport pool after 
obtaining info from ZookeeperInstance.  Consider the following example scenario 
where there is only one ZooKeeperInstance in use and both threads a are using 
it.

 # thread 1 is locating a tablet and ends up in    
{{MetadataLocationObtainer.lookupTablet()}} where it calls a method on 
{{ZooKeeperInstance}} and then calls {{ThriftScanner.getBatchFromServer()}}
 # thread 2 calls {{ZookeeperInstance.close()}} which shuts down the 
{{ThriftTransportPool}} stopping its thread
 # thread 1 is in {{ThriftScanner.getBatchFromServer()}} which calls 
{{ThriftUtil.getTServerClient()}} which eventually calls 
{{ThriftTransportPool.getInstance().getTransport()}}.  This starts the thread 
in ThriftTransportPool that thread 2 was trying to kill with {{close()}}.



> ZooKeeperInstance.close() not freeing resources in multithreaded env
> --------------------------------------------------------------------
>
>                 Key: ACCUMULO-2027
>                 URL: https://issues.apache.org/jira/browse/ACCUMULO-2027
>             Project: Accumulo
>          Issue Type: Bug
>            Reporter: Keith Turner
>            Assignee: William Slacum
>            Priority: Critical
>             Fix For: 1.4.5, 1.5.1, 1.6.0
>
>
> While looking at the changes related to ZooKeeperInstance.close() in the 
> 1.4.5-SNAPSHOT branch I noticed there were race conditions where resources 
> were not properly released.   One type of race condition is where a thread is 
> between a closed check in ZooKeeperInstance and calling a ZooCache method 
> when ZooKeeperInstance.close() is called.  The following is an example 
> situation
>  # Thread 1 uses ZooKeeperInstance1 to get a zoocache.
>  # Thread 2 calls close() on ZooKeeperInstnce1 which calls close() on zoocache
>  # Thread 1 uses the zoocache it has reference to, causing a new zookeeper 
> connection to be created.
> Below is an example program that will trigger this behavior.   For me this 
> little example program reliably shows a connected zookeeper after all of the 
> threads die.  If I use 0 threads it will show a closed zookeeper connection 
> at the end.
> {code:java}
>  static class WriteTask implements Runnable {
>     private BatchWriter writer;
>     private Random rand;
>     WriteTask(Connector conn) throws TableNotFoundException {
>       rand = new Random();
>       writer = conn.createBatchWriter("foo5", 10000000, 30000, 1);
>     }
>     @Override
>     public void run() {
>       try {
>         while (true) {
>           Mutation m1 = new Mutation(String.format("%06d", 
> rand.nextInt(1000000)));
>           m1.put(String.format("%06d", rand.nextInt(100)), 
> String.format("%06d", rand.nextInt(100)), String.format("%06d", 
> rand.nextInt(1000000)));
>           writer.addMutation(m1);
>           writer.flush();
>         }
>       } catch (Exception e) {
>         System.out.println(e.getMessage());
>       }
>     }
>   }
>   static class ReadTask implements Runnable {
>     private Scanner scanner;
>     ReadTask(Connector conn) throws TableNotFoundException {
>       scanner = conn.createScanner("foo5", new Authorizations());
>     }
>     @Override
>     public void run() {
>       try {
>         while (true) {
>           for (Entry<Key,Value> entry : scanner) {
>           }
>         }
>       } catch (Exception e) {
>         System.out.println(e.getMessage());
>       }
>     }
>   }
>   @Test(timeout = 30000)
>   public void test2() throws Exception {
>     ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), 
> accumulo.getZooKeepers());
>     Connector conn = zki.getConnector("root", "superSecret");
>     conn.tableOperations().create("foo5");
>     ArrayList<Thread> threads = new ArrayList<Thread>();
>     int numThreads = 10;
>     for (int i = 0; i < numThreads; i++) {
>       Thread t = new Thread(new WriteTask(conn));
>       t.start();
>       threads.add(t);
>     }
>     for (int i = 0; i < numThreads; i++) {
>       Thread t = new Thread(new ReadTask(conn));
>       t.start();
>       threads.add(t);
>     }
>     // let threads get spun up
>     Thread.sleep(1000);
>     ZooSession.printSessions();
>     zki.close();
>     // wait for the threads to die
>     for (Thread thread : threads) {
>       thread.join();
>     }
>     ZooSession.printSessions();
>   }
> {code}
> Below are some changes I made to ZooSession for debugging purposes.
> {noformat}
> diff --git 
> a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java 
> b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
> index b3db26f..475a21d 100644
> --- 
> a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
> +++ 
> b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
> @@ -20,6 +20,8 @@
>  import java.net.UnknownHostException;
>  import java.util.HashMap;
>  import java.util.Map;
> +import java.util.Map.Entry;
> +import java.util.Set;
>  
>  import org.apache.accumulo.core.util.UtilWaitThread;
>  import org.apache.log4j.Logger;
> @@ -29,7 +31,7 @@
>  import org.apache.zookeeper.ZooKeeper;
>  import org.apache.zookeeper.ZooKeeper.States;
>  
> -class ZooSession {
> +public class ZooSession {
>    
>    private static final Logger log = Logger.getLogger(ZooSession.class);
>    
> @@ -121,6 +123,8 @@
>      
>      ZooSessionInfo zsi = sessions.get(sessionKey);
>      if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
> +      System.out.println("Removing closed session ");
> +      new Exception().printStackTrace();
>        if (auth != null && sessions.get(readOnlySessionKey) == zsi)
>          sessions.remove(readOnlySessionKey);
>        zsi = null;
> @@ -137,4 +141,13 @@
>      }
>      return zsi.zooKeeper;
>    }
> +
> +  public static synchronized void printSessions() {
> +    Set<Entry<String,ZooSessionInfo>> es = sessions.entrySet();
> +
> +    for (Entry<String,ZooSessionInfo> entry : es) {
> +      System.out.println(entry.getKey() + " " + 
> entry.getValue().zooKeeper.getState());
> +    }
> +  }
> +
>  }
> {noformat}
> With the above changes I will see an exception like the following when one of 
> the race conditions occurs.
> {noformat}
> Removing closed session 
> java.lang.Exception
>       at 
> org.apache.accumulo.core.zookeeper.ZooSession.getSession(ZooSession.java:127)
>       at 
> org.apache.accumulo.core.zookeeper.ZooReader.getSession(ZooReader.java:37)
>       at 
> org.apache.accumulo.core.zookeeper.ZooReader.getZooKeeper(ZooReader.java:41)
>       at 
> org.apache.accumulo.core.zookeeper.ZooCache.getZooKeeper(ZooCache.java:56)
>       at org.apache.accumulo.core.zookeeper.ZooCache.retry(ZooCache.java:127)
>       at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:233)
>       at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:188)
>       at 
> org.apache.accumulo.core.client.ZooKeeperInstance.getInstanceID(ZooKeeperInstance.java:156)
>       at 
> org.apache.accumulo.core.client.impl.TabletLocator.getInstance(TabletLocator.java:96)
>       at 
> org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:245)
>       at 
> org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:94)
>       at 
> org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:176)
>       at 
> org.apache.accumulo.minicluster.MiniAccumuloClusterTest$ReadTask.run(MiniAccumuloClusterTest.java:109)
>       at java.lang.Thread.run(Thread.java:662)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to