Keith Turner created ACCUMULO-2027:
--------------------------------------
Summary: ZooKeeperInstance.close() not freeing resources in
multithreaded env
Key: ACCUMULO-2027
URL: https://issues.apache.org/jira/browse/ACCUMULO-2027
Project: Accumulo
Issue Type: Bug
Reporter: Keith Turner
Priority: Critical
Fix For: 1.4.5, 1.5.1, 1.6.0
While looking at the changes related to ZooKeeperInstance.close() in the
1.4.5-SNAPSHOT branch I noticed there were race conditions where resources were
not properly released. One type of race condition is where a thread is
between a closed check in ZooKeeperInstance and calling a ZooCache method when
ZooKeeperInstance.close() is called. The following is an example situation
# Thread 1 uses ZooKeeperInstance1 to get a zoocache.
# Thread 2 calls close() on ZooKeeperInstnce1 which calls close() on zoocache
# Thread 1 uses the zoocache it has reference to, causing a new zookeeper
connection to be created.
Below is an example program that will trigger this behavior. For me this
little example program reliably shows a connected zookeeper after all of the
threads die. If I use 0 threads it will show a closed zookeeper connection at
the end.
{code:java}
static class WriteTask implements Runnable {
private BatchWriter writer;
private Random rand;
WriteTask(Connector conn) throws TableNotFoundException {
rand = new Random();
writer = conn.createBatchWriter("foo5", 10000000, 30000, 1);
}
@Override
public void run() {
try {
while (true) {
Mutation m1 = new Mutation(String.format("%06d",
rand.nextInt(1000000)));
m1.put(String.format("%06d", rand.nextInt(100)),
String.format("%06d", rand.nextInt(100)), String.format("%06d",
rand.nextInt(1000000)));
writer.addMutation(m1);
writer.flush();
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
static class ReadTask implements Runnable {
private Scanner scanner;
ReadTask(Connector conn) throws TableNotFoundException {
scanner = conn.createScanner("foo5", new Authorizations());
}
@Override
public void run() {
try {
while (true) {
for (Entry<Key,Value> entry : scanner) {
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
@Test(timeout = 30000)
public void test2() throws Exception {
ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(),
accumulo.getZooKeepers());
Connector conn = zki.getConnector("root", "superSecret");
conn.tableOperations().create("foo5");
ArrayList<Thread> threads = new ArrayList<Thread>();
int numThreads = 10;
for (int i = 0; i < numThreads; i++) {
Thread t = new Thread(new WriteTask(conn));
t.start();
threads.add(t);
}
for (int i = 0; i < numThreads; i++) {
Thread t = new Thread(new ReadTask(conn));
t.start();
threads.add(t);
}
// let threads get spun up
Thread.sleep(1000);
ZooSession.printSessions();
zki.close();
// wait for the threads to die
for (Thread thread : threads) {
thread.join();
}
ZooSession.printSessions();
}
{code}
Below are some changes I made to ZooSession for debugging purposes.
{noformat}
diff --git
a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index b3db26f..475a21d 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -20,6 +20,8 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.log4j.Logger;
@@ -29,7 +31,7 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
-class ZooSession {
+public class ZooSession {
private static final Logger log = Logger.getLogger(ZooSession.class);
@@ -121,6 +123,8 @@
ZooSessionInfo zsi = sessions.get(sessionKey);
if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+ System.out.println("Removing closed session ");
+ new Exception().printStackTrace();
if (auth != null && sessions.get(readOnlySessionKey) == zsi)
sessions.remove(readOnlySessionKey);
zsi = null;
@@ -137,4 +141,13 @@
}
return zsi.zooKeeper;
}
+
+ public static synchronized void printSessions() {
+ Set<Entry<String,ZooSessionInfo>> es = sessions.entrySet();
+
+ for (Entry<String,ZooSessionInfo> entry : es) {
+ System.out.println(entry.getKey() + " " +
entry.getValue().zooKeeper.getState());
+ }
+ }
+
}
{noformat}
With the above changes I will see an exception like the following when one of
the race conditions occurs.
{noformat}
Removing closed session
java.lang.Exception
at
org.apache.accumulo.core.zookeeper.ZooSession.getSession(ZooSession.java:127)
at
org.apache.accumulo.core.zookeeper.ZooReader.getSession(ZooReader.java:37)
at
org.apache.accumulo.core.zookeeper.ZooReader.getZooKeeper(ZooReader.java:41)
at
org.apache.accumulo.core.zookeeper.ZooCache.getZooKeeper(ZooCache.java:56)
at org.apache.accumulo.core.zookeeper.ZooCache.retry(ZooCache.java:127)
at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:233)
at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:188)
at
org.apache.accumulo.core.client.ZooKeeperInstance.getInstanceID(ZooKeeperInstance.java:156)
at
org.apache.accumulo.core.client.impl.TabletLocator.getInstance(TabletLocator.java:96)
at
org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:245)
at
org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:94)
at
org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:176)
at
org.apache.accumulo.minicluster.MiniAccumuloClusterTest$ReadTask.run(MiniAccumuloClusterTest.java:109)
at java.lang.Thread.run(Thread.java:662)
{noformat}
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)