kpm1985 closed pull request #1004: FLUO-1000 OracleServer race conditions
URL: https://github.com/apache/fluo/pull/1004
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
b/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
index ecda2d77..977d5bc4 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
@@ -120,8 +120,8 @@ void initialize(InitializationOptions opts)
throws AlreadyInitializedException, TableExistsException;
/**
- * Removes Fluo application, Accumulo table and shared configuration in
Zookeeper. Shared configuration
- * consists of all properties except those with
+ * Removes Fluo application, Accumulo table and shared configuration in
Zookeeper. Shared
+ * configuration consists of all properties except those with
* {@value org.apache.fluo.api.config.FluoConfiguration#CONNECTION_PREFIX}
prefix.
*
* @since 1.2.0
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
index 6f35fbc2..208e3e5e 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
@@ -18,7 +18,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 150a055b..df80f8d1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -18,7 +18,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index 0c7ddf07..a2f0fe3a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -195,11 +195,16 @@ public OracleServer(Environment env) throws Exception {
private void allocateTimestamp() throws Exception {
Stat stat = new Stat();
+ // FLUO-1000
+ while (!curatorFramework.getState().equals(CuratorFrameworkState.STARTED))
{
+ Thread.sleep(200);
+ }
byte[] d =
curatorFramework.getData().storingStatIn(stat).forPath(maxTsPath);
// TODO check that d is expected
// TODO check that still server when setting
- // TODO make num allocated variable... when a server first starts allocate
a small amount... the
+ // TODO make num allocated variable... when a server first starts allocate
a small amount...
+ // the
// longer it runs and the busier it is, allocate bigger blocks
long newMax = Long.parseLong(new String(d)) + 1000;
@@ -208,6 +213,7 @@ private void allocateTimestamp() throws Exception {
LongUtil.toByteArray(newMax));
maxTs = newMax;
+
if (!isLeader) {
throw new IllegalStateException();
}
@@ -318,6 +324,10 @@ public synchronized void start() throws Exception {
curatorFramework.getConnectionStateListenable().addListener(cnxnListener);
curatorFramework.start();
+ while (!curatorFramework.getState().equals(CuratorFrameworkState.STARTED))
{
+ Thread.sleep(100);
+ }
+
while (!cnxnListener.isConnected()) {
Thread.sleep(200);
}
@@ -410,6 +420,9 @@ public void takeLeadership(CuratorFramework
curatorFramework) throws Exception {
}
synchronized (this) {
+ // FLUO-1000
+ isLeader = true;
+
byte[] d = curatorFramework.getData().forPath(maxTsPath);
currentTs = maxTs = LongUtil.fromByteArray(d);
}
@@ -417,8 +430,6 @@ public void takeLeadership(CuratorFramework
curatorFramework) throws Exception {
gcTsTracker = new GcTimestampTracker();
gcTsTracker.start();
- isLeader = true;
-
while (started) {
// if leadership is lost, then curator will interrupt the thread that
called this method
Thread.sleep(100);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services