asfgit closed pull request #960: Use NewTableConfiguration (and also trivial
code cleanup)
URL: https://github.com/apache/fluo/pull/960
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.travis.yml b/.travis.yml
index b44275ba..7f0dcf00 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,8 +12,8 @@
# the License.
language: java
jdk:
- - oraclejdk8
-script: mvn verify javadoc:jar
+ - openjdk8
+script: mvn clean verify javadoc:jar -Daccumulo.version=1.8.1
-Dthrift.version=0.9.3
notifications:
irc:
channels:
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
index 97be335d..fba872b7 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -15,7 +15,6 @@
package org.apache.fluo.accumulo.iterators;
-import java.lang.IllegalArgumentException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.function.LongPredicate;
@@ -25,9 +24,8 @@
import org.apache.accumulo.core.data.Value;
/**
- * This class buffers Keys that all have the same row+column. Internally
- * it only stores one Key, a list of timestamps and a list of values. At
iteration
- * time it materializes each Key+Value.
+ * This class buffers Keys that all have the same row+column. Internally it
only stores one Key, a
+ * list of timestamps and a list of values. At iteration time it materializes
each Key+Value.
*/
class ColumnBuffer {
@@ -53,11 +51,11 @@ private void add(long timestamp, byte[] v) {
}
/**
- * When empty, the first key added sets the row+column. After this all keys
- * added must have the same row+column.
+ * When empty, the first key added sets the row+column. After this all keys
added must have the
+ * same row+column.
*
* @param k Key to be added to buffer
- * @param v Value to be added to buffer
+ * @param vByte Value to be added to buffer
*/
public void add(Key k, byte[] vByte) throws IllegalArgumentException {
vByte = Arrays.copyOf(vByte, vByte.length);
@@ -73,8 +71,8 @@ public void add(Key k, byte[] vByte) throws
IllegalArgumentException {
}
/**
- * When empty, the first key added sets the row+column. After this all keys
- * added must have the same row+column.
+ * When empty, the first key added sets the row+column. After this all keys
added must have the
+ * same row+column.
*
* @param k Key to be added to buffer
* @param v Value to be added to buffer
@@ -84,8 +82,8 @@ public void add(Key k, Value v) throws
IllegalArgumentException {
}
/**
- * Clears the dest ColumnBuffer and inserts all entries in dest where the
timestamp passes
- * the timestampTest.
+ * Clears the dest ColumnBuffer and inserts all entries in dest where the
timestamp passes the
+ * timestampTest.
*
* @param dest Destination ColumnBuffer
* @param timestampTest Test to determine which timestamps get added to dest
@@ -119,7 +117,7 @@ public int size() {
}
/**
- * @param pos Position of the Key that will be retrieved
+ * @param pos Position of the Key that will be retrieved
* @return The key at a given position
*/
public Key getKey(int pos) {
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index 1814dfa4..d07f59ee 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -16,12 +16,9 @@
package org.apache.fluo.accumulo.iterators;
import java.io.IOException;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
-import java.util.function.LongPredicate;
import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -87,7 +84,6 @@ public void init(SortedKeyValueIterator<Key, Value> source,
Map<String, String>
}
}
-
@Override
public boolean hasTop() {
return position < keysFiltered.size() || source.hasTop();
@@ -259,7 +255,8 @@ private void readColMetadata() throws IOException {
if (isFullMajc) {
if (isDelete) {
if (DelReadLockValue.isRollback(source.getTopValue().get())) {
- // can drop rolled back read lock delete markers on any full
majc, do not need to consider gcTimestamp
+ // can drop rolled back read lock delete markers on any full
majc, do not need to
+ // consider gcTimestamp
keep = false;
} else {
long rlockCommitTs =
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
index e1966f09..c67b7822 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -21,5 +21,8 @@
public static final String TABLE_CLASSPATH = "table.classpath.context";
public static final String TABLE_BLOCKCACHE_ENABLED =
"table.cache.block.enable";
public static final String TABLE_FORMATTER_CLASS = "table.formatter";
+ public static final String TABLE_GROUP_PREFIX = "table.group.";
+ public static final String TABLE_GROUPS_ENABLED = "table.groups.enabled";
+ public static final Object TABLE_ITERATOR_PREFIX = "table.iterator.";
}
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index c38b9e6e..7063adcd 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -32,6 +32,8 @@
public static final long DATA_PREFIX = 0xa000000000000000L;
public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
public static final Bytes NOTIFY_CF = Bytes.of("ntfy");
+ public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify";
+ public static final Bytes GC_CF = Bytes.of("gc");
private ColumnConstants() {}
diff --git
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index 262df155..dbdcd1b6 100644
---
a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++
b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -83,7 +83,7 @@ public TestData addIfInRange(String key, String value, Range
range) {
String cq = fields[2];
String ct;
long ts;
- byte[] val = new byte[0];;
+ byte[] val = new byte[0];
if (cf.equals("ntfy")) {
ts = Long.parseLong(fields[3]) << 1;
diff --git
a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
index 1ba7f9b2..8c0fa4cc 100644
---
a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
+++
b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
@@ -47,7 +47,7 @@
* toString(). Second, the actual Transaction implementation will under some
circumstances return
* the Bytes object that was passed in.
*/
- private Map<String, Bytes> s2bCache = new WeakHashMap<String, Bytes>();
+ private Map<String, Bytes> s2bCache = new WeakHashMap<>();
public AbstractSnapshotBase() {}
diff --git
a/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
b/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
index fcffe558..7409ca90 100644
---
a/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
+++
b/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
@@ -69,7 +69,7 @@ public Bytes get(Bytes row, Column column) {
@Override
public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
- HashMap<Column, Bytes> ret = new HashMap<Column, Bytes>();
+ HashMap<Column, Bytes> ret = new HashMap<>();
for (Column column : columns) {
RowColumn rc = new RowColumn(row, column);
if (snapshot.containsKey(rc)) {
diff --git
a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 316d0fca..3193f7f9 100644
---
a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++
b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -49,8 +49,11 @@ public void testDefaults() {
base.getConnectionRetryTimeout());
Assert.assertEquals(FluoConfiguration.ACCUMULO_ZOOKEEPERS_DEFAULT,
base.getAccumuloZookeepers());
- Assert.assertEquals(FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT,
- base.getAccumuloClasspath());
+ @SuppressWarnings("deprecation")
+ String tmpFieldName = FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT;
+ @SuppressWarnings("deprecation")
+ String tmpCP = base.getAccumuloClasspath();
+ Assert.assertEquals(tmpFieldName, tmpCP);
Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT,
base.getWorkerThreads());
Assert.assertEquals(FluoConfiguration.TRANSACTION_ROLLBACK_TIME_DEFAULT,
base.getTransactionRollbackTime());
@@ -86,8 +89,9 @@ public void testTable() {
@Test
public void testSetGet() {
FluoConfiguration config = new FluoConfiguration();
- Assert.assertEquals("path1,path2",
- config.setAccumuloClasspath("path1,path2").getAccumuloClasspath());
+ @SuppressWarnings("deprecation")
+ String tmpCP =
config.setAccumuloClasspath("path1,path2").getAccumuloClasspath();
+ Assert.assertEquals("path1,path2", tmpCP);
Assert.assertEquals("path1,path2",
config.setAccumuloJars("path1,path2").getAccumuloJars());
Assert.assertEquals("instance",
config.setAccumuloInstance("instance").getAccumuloInstance());
Assert.assertEquals("pass",
config.setAccumuloPassword("pass").getAccumuloPassword());
@@ -184,7 +188,9 @@ public void testLoadingOldPropsFile() {
FluoConfiguration config = new FluoConfiguration(propsFile);
// make sure classpath contains comma. otherwise it was shortened
- Assert.assertTrue(config.getAccumuloClasspath().contains(","));
+ @SuppressWarnings("deprecation")
+ String tmpCP = config.getAccumuloClasspath();
+ Assert.assertTrue(tmpCP.contains(","));
// check for values set in prop file
Assert.assertEquals("localhost/fluo", config.getInstanceZookeepers());
Assert.assertEquals("localhost", config.getAccumuloZookeepers());
@@ -249,7 +255,9 @@ public void testLoadingOldTestPropsFile() {
FluoConfiguration config = new FluoConfiguration(propsFile);
// make sure classpath contains comma. otherwise it was shortened
- Assert.assertTrue(config.getAccumuloClasspath().contains(","));
+ @SuppressWarnings("deprecation")
+ String tmpCP = config.getAccumuloClasspath();
+ Assert.assertTrue(tmpCP.contains(","));
// check for values set in prop file
Assert.assertEquals("app1", config.getApplicationName());
Assert.assertEquals("localhost/fluo2", config.getInstanceZookeepers());
@@ -258,7 +266,9 @@ public void testLoadingOldTestPropsFile() {
Assert.assertEquals("user5", config.getAccumuloUser());
Assert.assertEquals("pass6", config.getAccumuloPassword());
Assert.assertEquals("zoo7", config.getAccumuloZookeepers());
- Assert.assertEquals(8, config.getClientRetryTimeout());
+ @SuppressWarnings("deprecation")
+ int tmpTimeout = config.getClientRetryTimeout();
+ Assert.assertEquals(8, tmpTimeout);
Assert.assertEquals(8, config.getConnectionRetryTimeout());
Assert.assertEquals("table9", config.getAccumuloTable());
}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
index 5939710b..99d09a0e 100644
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
+++
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -21,7 +21,6 @@
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.service.FluoOracle;
-import org.apache.fluo.cluster.util.LogbackUtil;
import org.apache.fluo.core.metrics.MetricNames;
import org.apache.fluo.core.oracle.FluoOracleImpl;
import org.apache.fluo.core.util.UtilWaitThread;
@@ -61,7 +60,7 @@ public void run() {
try {
if (!logDir.equals(STDOUT)) {
- LogbackUtil.init("oracle", configDir, logDir);
+ org.apache.fluo.cluster.util.LogbackUtil.init("oracle", configDir,
logDir);
}
} catch (Exception e) {
System.err.println("Exception while starting FluoOracle: " +
e.getMessage());
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
index fa2229b4..dbfd191b 100644
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
+++
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -21,7 +21,6 @@
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.service.FluoWorker;
-import org.apache.fluo.cluster.util.LogbackUtil;
import org.apache.fluo.core.metrics.MetricNames;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.core.worker.FluoWorkerImpl;
@@ -61,7 +60,7 @@ public void run() {
try {
if (!logDir.equals(STDOUT)) {
- LogbackUtil.init("worker", configDir, logDir);
+ org.apache.fluo.cluster.util.LogbackUtil.init("worker", configDir,
logDir);
}
} catch (Exception e) {
System.err.println("Exception while starting FluoWorker: " +
e.getMessage());
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index ca62c693..9de17f44 100644
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -33,7 +33,6 @@
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.cluster.util.FluoYarnConfig;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ScanUtil;
@@ -118,7 +117,8 @@ public void waitUntilFinished(FluoConfiguration config) {
}
try {
- long sleepSec = calculateSleep(ntfyCount,
FluoYarnConfig.getWorkerInstances(config));
+ long sleepSec = calculateSleep(ntfyCount,
+
org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config));
log.info("{} notifications are still outstanding. Will try again in
{} seconds...",
ntfyCount, sleepSec);
Thread.sleep(1000 * sleepSec);
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
index f5c97c58..be896062 100644
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -30,11 +30,6 @@
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.cluster.runnable.OracleRunnable;
-import org.apache.fluo.cluster.runnable.WorkerRunnable;
-import org.apache.fluo.cluster.util.FluoYarnConfig;
-import org.apache.fluo.cluster.yarn.FluoTwillApp;
-import org.apache.fluo.cluster.yarn.TwillUtil;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.hadoop.fs.Path;
@@ -185,7 +180,8 @@ public void start(FluoConfiguration config, String
appConfDir, String appLibPath
throw new FluoException("Invalid fluo.properties due to " +
e.getMessage(), e);
}
- TwillPreparer preparer = getTwillRunner(config).prepare(new
FluoTwillApp(config, appConfDir));
+ TwillPreparer preparer = getTwillRunner(config)
+ .prepare(new org.apache.fluo.cluster.yarn.FluoTwillApp(config,
appConfDir));
// Add jars from fluo lib/ directory that are not being loaded by Twill.
try {
@@ -327,17 +323,24 @@ private boolean isReady(TwillController controller) {
}
private boolean allContainersRunning(TwillController controller,
FluoConfiguration config) {
- return TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) ==
FluoYarnConfig
- .getOracleInstances(config)
- && TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) ==
FluoYarnConfig
- .getWorkerInstances(config);
+ return org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
+ org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME) ==
org.apache.fluo.cluster.util.FluoYarnConfig
+ .getOracleInstances(config)
+ && org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
+ org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME) ==
org.apache.fluo.cluster.util.FluoYarnConfig
+ .getWorkerInstances(config);
}
private String containerStatus(TwillController controller, FluoConfiguration
config) {
- return "" + TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) +
" of "
- + FluoYarnConfig.getOracleInstances(config) + " Oracle containers and "
- + TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) + " of "
- + FluoYarnConfig.getWorkerInstances(config) + " Worker containers";
+ return ""
+ + org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
+ org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME)
+ + " of " +
org.apache.fluo.cluster.util.FluoYarnConfig.getOracleInstances(config)
+ + " Oracle containers and "
+ + org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
+ org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME)
+ + " of " +
org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config)
+ + " Worker containers";
}
public void status(FluoConfiguration config, boolean extraInfo) {
@@ -366,15 +369,19 @@ public void status(FluoConfiguration config, boolean
extraInfo) {
if (extraInfo) {
ResourceReport report = getResourceReport(controller, 30000);
Collection<TwillRunResources> resources;
- resources = report.getRunnableResources(OracleRunnable.ORACLE_NAME);
+ resources = report
+
.getRunnableResources(org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME);
System.out.println("\nThe application has " + resources.size() + " of "
- + FluoYarnConfig.getOracleInstances(config) + " desired Oracle
containers:\n");
- TwillUtil.printResources(resources);
+ +
org.apache.fluo.cluster.util.FluoYarnConfig.getOracleInstances(config)
+ + " desired Oracle containers:\n");
+ org.apache.fluo.cluster.yarn.TwillUtil.printResources(resources);
- resources = report.getRunnableResources(WorkerRunnable.WORKER_NAME);
+ resources = report
+
.getRunnableResources(org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME);
System.out.println("\nThe application has " + resources.size() + " of "
- + FluoYarnConfig.getWorkerInstances(config) + " desired Worker
containers:\n");
- TwillUtil.printResources(resources);
+ +
org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config)
+ + " desired Worker containers:\n");
+ org.apache.fluo.cluster.yarn.TwillUtil.printResources(resources);
}
}
}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
index f0d8d733..eda580a7 100644
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++
b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -18,10 +18,6 @@
import java.io.File;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.cluster.runnable.OracleRunnable;
-import org.apache.fluo.cluster.runnable.WorkerRunnable;
-import org.apache.fluo.cluster.runner.YarnAppRunner;
-import org.apache.fluo.cluster.util.FluoYarnConfig;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.ResourceSpecification.SizeUnit;
import org.apache.twill.api.TwillApplication;
@@ -71,26 +67,37 @@ private MoreFile addConfigFiles(LocalFileAdder fileAdder) {
@Override
public TwillSpecification configure() {
- final int oracleInstances = FluoYarnConfig.getOracleInstances(config);
- final int oracleMaxMemory = FluoYarnConfig.getOracleMaxMemory(config);
- final int oracleNumCores = FluoYarnConfig.getOracleNumCores(config);
- final int workerInstances = FluoYarnConfig.getWorkerInstances(config);
- final int workerMaxMemory = FluoYarnConfig.getWorkerMaxMemory(config);
- final int workerNumCores = FluoYarnConfig.getWorkerNumCores(config);
+ final int oracleInstances =
+ org.apache.fluo.cluster.util.FluoYarnConfig.getOracleInstances(config);
+ final int oracleMaxMemory =
+ org.apache.fluo.cluster.util.FluoYarnConfig.getOracleMaxMemory(config);
+ final int oracleNumCores =
+ org.apache.fluo.cluster.util.FluoYarnConfig.getOracleNumCores(config);
+ final int workerInstances =
+ org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config);
+ final int workerMaxMemory =
+ org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerMaxMemory(config);
+ final int workerNumCores =
+ org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerNumCores(config);
log.info(
"Configuring Fluo '{}' application with {} Oracle instances and {}
Worker instances "
+ "with following properties:",
config.getApplicationName(), oracleInstances, workerInstances);
- log.info("{} = {}", FluoYarnConfig.ORACLE_MAX_MEMORY_MB_PROP,
oracleMaxMemory);
- log.info("{} = {}", FluoYarnConfig.WORKER_MAX_MEMORY_MB_PROP,
workerMaxMemory);
- log.info("{} = {}", FluoYarnConfig.ORACLE_NUM_CORES_PROP, oracleNumCores);
- log.info("{} = {}", FluoYarnConfig.WORKER_NUM_CORES_PROP, workerNumCores);
+ log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.ORACLE_MAX_MEMORY_MB_PROP,
+ oracleMaxMemory);
+ log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.WORKER_MAX_MEMORY_MB_PROP,
+ workerMaxMemory);
+ log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.ORACLE_NUM_CORES_PROP,
+ oracleNumCores);
+ log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.WORKER_NUM_CORES_PROP,
+ workerNumCores);
// Start building Fluo Twill application
- MoreRunnable moreRunnable = TwillSpecification.Builder.with()
-
.setName(YarnAppRunner.getYarnApplicationName(config.getApplicationName())).withRunnable();
+ MoreRunnable moreRunnable =
+
TwillSpecification.Builder.with().setName(org.apache.fluo.cluster.runner.YarnAppRunner
+
.getYarnApplicationName(config.getApplicationName())).withRunnable();
// Configure Oracle(s)
ResourceSpecification oracleResources =
@@ -98,7 +105,9 @@ public TwillSpecification configure() {
.setMemory(oracleMaxMemory,
SizeUnit.MEGA).setInstances(oracleInstances).build();
LocalFileAdder fileAdder = moreRunnable
- .add(OracleRunnable.ORACLE_NAME, new OracleRunnable(),
oracleResources).withLocalFiles();
+ .add(org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME,
+ new org.apache.fluo.cluster.runnable.OracleRunnable(),
oracleResources)
+ .withLocalFiles();
RunnableSetter runnableSetter = addConfigFiles(fileAdder).apply();
// Configure Worker(s)
@@ -107,11 +116,14 @@ public TwillSpecification configure() {
.setMemory(workerMaxMemory,
SizeUnit.MEGA).setInstances(workerInstances).build();
fileAdder = runnableSetter
- .add(WorkerRunnable.WORKER_NAME, new WorkerRunnable(),
workerResources).withLocalFiles();
+ .add(org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME,
+ new org.apache.fluo.cluster.runnable.WorkerRunnable(),
workerResources)
+ .withLocalFiles();
runnableSetter = addConfigFiles(fileAdder).apply();
// Set runnable order, build and return TwillSpecification
- return runnableSetter.withOrder().begin(OracleRunnable.ORACLE_NAME)
- .nextWhenStarted(WorkerRunnable.WORKER_NAME).build();
+ return runnableSetter.withOrder()
+ .begin(org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME)
+
.nextWhenStarted(org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME).build();
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index f25a1102..158b7d1f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -23,13 +23,12 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
@@ -37,7 +36,9 @@
import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator;
import org.apache.fluo.accumulo.iterators.NotificationIterator;
@@ -48,17 +49,16 @@
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.observer.ObserverUtil;
import org.apache.fluo.core.util.AccumuloUtil;
-import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.core.worker.finder.hash.PartitionManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
@@ -79,7 +79,6 @@
public FluoAdminImpl(FluoConfiguration config) {
this.config = config;
-
appRootDir = ZookeeperUtil.parseRoot(config.getAppZookeepers());
rootCurator = CuratorUtil.newRootFluoCurator(config);
rootCurator.start();
@@ -147,7 +146,8 @@ public void initialize(InitializationOptions opts)
}
try {
- initialize(conn);
+ initializeApplicationInZooKeeper(conn);
+ Map<String, String> ntcProps = initializeApplicationTableProps();
String accumuloJars;
if (!config.getAccumuloJars().trim().isEmpty()) {
@@ -160,15 +160,16 @@ public void initialize(InitializationOptions opts)
if (!accumuloJars.isEmpty()) {
accumuloClasspath = copyJarsToDfs(accumuloJars, "lib/accumulo");
} else {
- accumuloClasspath = config.getAccumuloClasspath().trim();
+ @SuppressWarnings("deprecation")
+ String tmpCP = config.getAccumuloClasspath().trim();
+ accumuloClasspath = tmpCP;
}
if (!accumuloClasspath.isEmpty()) {
String contextName = "fluo-" + config.getApplicationName();
conn.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName,
accumuloClasspath);
- conn.tableOperations().setProperty(config.getAccumuloTable(),
AccumuloProps.TABLE_CLASSPATH,
- contextName);
+ ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
}
if (config.getObserverJarsUrl().isEmpty() &&
!config.getObserverInitDir().trim().isEmpty()) {
@@ -176,8 +177,11 @@ public void initialize(InitializationOptions opts)
config.setObserverJarsUrl(observerUrl);
}
- conn.tableOperations().setProperty(config.getAccumuloTable(),
- AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+ ntcProps.put(AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+
+ NewTableConfiguration ntc = new
NewTableConfiguration().withoutDefaultIterators();
+ ntc.setProperties(ntcProps);
+ conn.tableOperations().create(config.getAccumuloTable(), ntc);
updateSharedConfig();
} catch (NodeExistsException nee) {
@@ -190,7 +194,7 @@ public void initialize(InitializationOptions opts)
}
}
- private void initialize(Connector conn) throws Exception {
+ private void initializeApplicationInZooKeeper(Connector conn) throws
Exception {
final String accumuloInstanceName = conn.getInstance().getInstanceName();
final String accumuloInstanceID = conn.getInstance().getInstanceID();
@@ -221,23 +225,49 @@ private void initialize(Connector conn) throws Exception {
CuratorUtil.NodeExistsPolicy.FAIL);
CuratorUtil.putData(curator, ZookeeperPath.ORACLE_GC_TIMESTAMP, new byte[]
{'0'},
CuratorUtil.NodeExistsPolicy.FAIL);
+ }
- conn.tableOperations().create(config.getAccumuloTable(), false);
- Map<String, Set<Text>> groups = new HashMap<>();
- groups.put("notify",
Collections.singleton(ByteUtil.toText(ColumnConstants.NOTIFY_CF)));
- conn.tableOperations().setLocalityGroups(config.getAccumuloTable(),
groups);
-
- IteratorSetting gcIter = new IteratorSetting(10, "gc",
GarbageCollectionIterator.class);
- GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
+ private String encodeColumnFamily(Bytes cf) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < cf.length(); i++) {
+ int c = 0xff & cf.byteAt(i);
+ if (c == '\\') {
+ sb.append("\\\\");
+ } else if (c >= 32 && c <= 126 && c != ',') {
+ sb.append((char) c);
+ } else {
+ sb.append("\\x").append(String.format("%02X", c));
+ }
+ }
+ return sb.toString();
+ }
- conn.tableOperations().attachIterator(config.getAccumuloTable(), gcIter,
- EnumSet.of(IteratorUtil.IteratorScope.majc,
IteratorUtil.IteratorScope.minc));
+ private Map<String, String> initializeApplicationTableProps() {
+ Map<String, String> ntcProps = new HashMap<>();
+ ntcProps.put(AccumuloProps.TABLE_GROUP_PREFIX +
ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
+ encodeColumnFamily(ColumnConstants.NOTIFY_CF));
+ ntcProps.put(AccumuloProps.TABLE_GROUPS_ENABLED,
ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME);
+ IteratorSetting gcIter =
+ new IteratorSetting(10, ColumnConstants.GC_CF.toString(),
GarbageCollectionIterator.class);
+ GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
// the order relative to gc iter should not matter
- IteratorSetting ntfyIter = new IteratorSetting(11, "ntfy",
NotificationIterator.class);
+ IteratorSetting ntfyIter =
+ new IteratorSetting(11, ColumnConstants.NOTIFY_CF.toString(),
NotificationIterator.class);
+
+ for (IteratorSetting setting : new IteratorSetting[] {gcIter, ntfyIter}) {
+ for (IteratorScope scope : EnumSet.of(IteratorUtil.IteratorScope.majc,
+ IteratorUtil.IteratorScope.minc)) {
+ String root = String.format("%s%s.%s",
AccumuloProps.TABLE_ITERATOR_PREFIX,
+ scope.name().toLowerCase(), setting.getName());
+ for (Entry<String, String> prop : setting.getOptions().entrySet()) {
+ ntcProps.put(root + ".opt." + prop.getKey(), prop.getValue());
+ }
+ ntcProps.put(root, setting.getPriority() + "," +
setting.getIteratorClass());
+ }
+ }
- conn.tableOperations().attachIterator(config.getAccumuloTable(), ntfyIter,
- EnumSet.of(IteratorUtil.IteratorScope.majc,
IteratorUtil.IteratorScope.minc));
+ return ntcProps;
}
@Override
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index 41723864..ea00c736 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -87,7 +87,6 @@
return groupedLocks;
}
-
private static class LockInfo {
final Bytes prow;
@@ -125,7 +124,7 @@ public LockInfo(Entry<Key, Value> kve) {
* @param startTs The logical start time from the oracle of the transaction
that encountered the
* lock
* @param stats Stats object for the transaction that encountered the lock
- * @param locks List of locks
+ * @param locksKVs List of locks
* @param startTime The wall time that the transaction that encountered the
lock first saw the
* lock
* @return true if all locks passed in were resolved (rolled forward or back)
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
index feba76e6..6c0f27fc 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -30,13 +30,11 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.Observer.NotificationType;
-import org.apache.fluo.api.observer.Observer.ObservedColumn;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.observer.ObserverStore;
import org.apache.fluo.core.observer.Observers;
@@ -58,18 +56,20 @@
@Override
public boolean handles(FluoConfiguration config) {
- Collection<ObserverSpecification> obsSpecs =
config.getObserverSpecifications();
+ Collection<org.apache.fluo.api.config.ObserverSpecification> obsSpecs =
+ config.getObserverSpecifications();
return !obsSpecs.isEmpty();
}
@Override
public void update(CuratorFramework curator, FluoConfiguration config)
throws Exception {
- Collection<ObserverSpecification> obsSpecs =
config.getObserverSpecifications();
+ Collection<org.apache.fluo.api.config.ObserverSpecification> obsSpecs =
+ config.getObserverSpecifications();
- Map<Column, ObserverSpecification> colObservers = new HashMap<>();
- Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification> colObservers
= new HashMap<>();
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
weakObservers = new HashMap<>();
- for (ObserverSpecification ospec : obsSpecs) {
+ for (org.apache.fluo.api.config.ObserverSpecification ospec : obsSpecs) {
Observer observer;
try {
observer =
Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
@@ -92,7 +92,8 @@ public void update(CuratorFramework curator,
FluoConfiguration config) throws Ex
e);
}
- ObservedColumn observedCol = observer.getObservedColumn();
+ org.apache.fluo.api.observer.Observer.ObservedColumn observedCol =
+ observer.getObservedColumn();
if (observedCol.getType() == NotificationType.STRONG) {
colObservers.put(observedCol.getColumn(), ospec);
} else {
@@ -104,8 +105,9 @@ public void update(CuratorFramework curator,
FluoConfiguration config) throws Ex
}
private static void updateObservers(CuratorFramework curator,
- Map<Column, ObserverSpecification> colObservers,
- Map<Column, ObserverSpecification> weakObservers) throws Exception {
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
colObservers,
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
weakObservers)
+ throws Exception {
// TODO check that no workers are running... or make workers watch this
znode
@@ -126,14 +128,16 @@ private static void updateObservers(CuratorFramework
curator,
}
private static void serializeObservers(DataOutputStream dos,
- Map<Column, ObserverSpecification> colObservers) throws IOException {
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
colObservers)
+ throws IOException {
// TODO use a human readable serialized format like json
- Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
+ Set<Entry<Column, org.apache.fluo.api.config.ObserverSpecification>> es =
+ colObservers.entrySet();
WritableUtils.writeVInt(dos, colObservers.size());
- for (Entry<Column, ObserverSpecification> entry : es) {
+ for (Entry<Column, org.apache.fluo.api.config.ObserverSpecification> entry
: es) {
ColumnUtil.writeColumn(entry.getKey(), dos);
dos.writeUTF(entry.getValue().getClassName());
Map<String, String> params = entry.getValue().getConfiguration().toMap();
@@ -145,8 +149,10 @@ private static void serializeObservers(DataOutputStream
dos,
}
}
- private static byte[] serializeObservers(Map<Column, ObserverSpecification>
colObservers,
- Map<Column, ObserverSpecification> weakObservers) throws IOException {
+ private static byte[] serializeObservers(
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
colObservers,
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
weakObservers)
+ throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputStream dos = new DataOutputStream(baos)) {
serializeObservers(dos, colObservers);
@@ -157,12 +163,11 @@ private static void serializeObservers(DataOutputStream
dos,
return serializedObservers;
}
+ private static Map<Column, org.apache.fluo.api.config.ObserverSpecification>
readObservers(
+ DataInputStream dis) throws IOException {
- private static Map<Column, ObserverSpecification>
readObservers(DataInputStream dis)
- throws IOException {
-
- ImmutableMap.Builder<Column, ObserverSpecification> omapBuilder =
- new ImmutableMap.Builder<Column, ObserverSpecification>();
+ ImmutableMap.Builder<Column,
org.apache.fluo.api.config.ObserverSpecification> omapBuilder =
+ new ImmutableMap.Builder<>();
int num = WritableUtils.readVInt(dis);
for (int i = 0; i < num; i++) {
@@ -176,7 +181,8 @@ private static void serializeObservers(DataOutputStream dos,
params.put(k, v);
}
- ObserverSpecification ospec = new ObserverSpecification(clazz, params);
+ org.apache.fluo.api.config.ObserverSpecification ospec =
+ new org.apache.fluo.api.config.ObserverSpecification(clazz, params);
omapBuilder.put(col, ospec);
}
return omapBuilder.build();
@@ -185,8 +191,8 @@ private static void serializeObservers(DataOutputStream dos,
@Override
public RegisteredObservers load(CuratorFramework curator) throws Exception {
- Map<Column, ObserverSpecification> observers;
- Map<Column, ObserverSpecification> weakObservers;
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification> observers;
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
weakObservers;
ByteArrayInputStream bais;
try {
@@ -200,7 +206,6 @@ public RegisteredObservers load(CuratorFramework curator)
throws Exception {
observers = readObservers(dis);
weakObservers = readObservers(dis);
-
return new RegisteredObservers() {
@Override
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
index ff3c667d..3965296e 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -22,7 +22,6 @@
import java.util.Map.Entry;
import com.google.common.collect.Iterables;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.core.impl.Environment;
@@ -37,8 +36,8 @@
private Environment env;
Map<Column, List<Observer>> observers = new HashMap<>();
- Map<Column, ObserverSpecification> strongObservers;
- Map<Column, ObserverSpecification> weakObservers;
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
strongObservers;
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification> weakObservers;
Map<Column, String> aliases;
private List<Observer> getObserverList(Column col) {
@@ -53,16 +52,17 @@
return observerList;
}
- public ObserversV1(Environment env, Map<Column, ObserverSpecification>
strongObservers,
- Map<Column, ObserverSpecification> weakObservers) {
+ public ObserversV1(Environment env,
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
strongObservers,
+ Map<Column, org.apache.fluo.api.config.ObserverSpecification>
weakObservers) {
this.env = env;
this.strongObservers = strongObservers;
this.weakObservers = weakObservers;
this.aliases = new HashMap<>();
- for (Entry<Column, ObserverSpecification> e :
Iterables.concat(strongObservers.entrySet(),
- weakObservers.entrySet())) {
- ObserverSpecification observerConfig = e.getValue();
+ for (Entry<Column, org.apache.fluo.api.config.ObserverSpecification> e :
Iterables
+ .concat(strongObservers.entrySet(), weakObservers.entrySet())) {
+ org.apache.fluo.api.config.ObserverSpecification observerConfig =
e.getValue();
try {
String alias =
Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).getSimpleName();
@@ -73,6 +73,7 @@ public ObserversV1(Environment env, Map<Column,
ObserverSpecification> strongObs
}
}
+ @Override
public Observer getObserver(Column col) {
List<Observer> observerList;
@@ -86,7 +87,7 @@ public Observer getObserver(Column col) {
Observer observer = null;
- ObserverSpecification observerConfig = strongObservers.get(col);
+ org.apache.fluo.api.config.ObserverSpecification observerConfig =
strongObservers.get(col);
if (observerConfig == null) {
observerConfig = weakObservers.get(col);
}
@@ -112,6 +113,7 @@ public Observer getObserver(Column col) {
return observer;
}
+ @Override
public void returnObserver(Observer observer) {
List<Observer> olist =
getObserverList(observer.getObservedColumn().getColumn());
synchronized (olist) {
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
index c7ac7343..c5731d87 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
@@ -105,8 +105,8 @@ public RegisteredObservers load(CuratorFramework curator)
throws Exception {
String json = new String(data, UTF_8);
JsonObservers jco = new Gson().fromJson(json, JsonObservers.class);
- ImmutableSet.Builder<Column> weakColumnsBuilder = new
ImmutableSet.Builder<Column>();
- ImmutableSet.Builder<Column> strongColumnsBuilder = new
ImmutableSet.Builder<Column>();
+ ImmutableSet.Builder<Column> weakColumnsBuilder = new
ImmutableSet.Builder<>();
+ ImmutableSet.Builder<Column> strongColumnsBuilder = new
ImmutableSet.Builder<>();
for (Entry<Column, NotificationType> entry :
jco.getObservedColumns().entrySet()) {
switch (entry.getValue()) {
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
index e33b28bc..22842bc0 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -90,6 +90,11 @@ public boolean equals(Object o) {
}
@Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String toString() {
return String.format(
"workers:%d groups:%d groupSize:%d groupId:%d idInGroup:%d
#tablets:%d", numWorkers,
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
index 40dc5e47..2957f7b1 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -75,6 +75,11 @@ public boolean equals(Object o) {
return false;
}
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
public int size() {
return tmap.size() + (lastRange == null ? 0 : 1);
}
diff --git
a/modules/core/src/test/java/org/apache/fluo/core/observer/ObserverStoreTest.java
b/modules/core/src/test/java/org/apache/fluo/core/observer/ObserverStoreTest.java
index fd94a125..c6fe2420 100644
---
a/modules/core/src/test/java/org/apache/fluo/core/observer/ObserverStoreTest.java
+++
b/modules/core/src/test/java/org/apache/fluo/core/observer/ObserverStoreTest.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -16,7 +16,6 @@
package org.apache.fluo.core.observer;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.core.observer.v1.ObserverStoreV1;
import org.apache.fluo.core.observer.v2.ObserverStoreV2;
import org.junit.Assert;
@@ -40,7 +39,7 @@ public void testNewAndOldConfig() {
Assert.assertTrue(ov2.handles(config));
config = new FluoConfiguration();
- config.addObserver(new ObserverSpecification("TestProvider2"));
+ config.addObserver(new
org.apache.fluo.api.config.ObserverSpecification("TestProvider2"));
Assert.assertTrue(ov1.handles(config));
Assert.assertFalse(ov2.handles(config));
}
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
index eaecc293..9aeca79f 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
@@ -20,6 +20,7 @@
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.integration.ITBase;
@@ -39,7 +40,7 @@
@Test
public void testTimestampSkippingIterPerformance() throws Exception {
- conn.tableOperations().create("ttsi", false);
+ conn.tableOperations().create("ttsi", new
NewTableConfiguration().withoutDefaultIterators());
BatchWriter bw = conn.createBatchWriter("ttsi", new BatchWriterConfig());
Mutation m = new Mutation("r1");
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 4a6a5cfe..02b8edc6 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -15,7 +15,16 @@
package org.apache.fluo.integration.client;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
@@ -26,6 +35,7 @@
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.integration.ITBaseImpl;
+import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -92,6 +102,23 @@ public void testInitializeConfig() throws Exception {
InitializationOptions opts =
new
InitializationOptions().setClearZookeeper(true).setClearTable(true);
admin.initialize(opts);
+
+ // verify locality groups were set on the table
+ Instance inst =
+ new ZooKeeperInstance(config.getAccumuloInstance(),
config.getAccumuloZookeepers());
+ Connector conn = inst.getConnector(config.getAccumuloUser(),
+ new PasswordToken(config.getAccumuloPassword()));
+ Map<String, Set<Text>> localityGroups =
+ conn.tableOperations().getLocalityGroups(config.getAccumuloTable());
+ Assert.assertEquals("Unexpected locality group count.", 1,
localityGroups.size());
+ Entry<String, Set<Text>> localityGroup =
localityGroups.entrySet().iterator().next();
+ Assert.assertEquals("'notify' locality group not found.",
+ ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
+ Assert.assertEquals("'notify' locality group does not contain exactly 1
column family.", 1,
+ localityGroup.getValue().size());
+ Text colFam = localityGroup.getValue().iterator().next();
+ Assert.assertTrue("'notify' locality group does not contain the correct
column family.",
+ ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0,
colFam.getLength()));
}
try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
@@ -156,4 +183,5 @@ public void testInitializeLongChroot() throws Exception {
Assert.assertNotNull(curator.checkExists().forPath(ZookeeperUtil.parseRoot(zk +
longPath)));
}
}
+
}
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
index c29de584..e7928684 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -24,13 +24,11 @@
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.metrics.Counter;
import org.apache.fluo.api.metrics.Meter;
-import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.api.observer.Observer.NotificationType;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
@@ -41,7 +39,7 @@
@Deprecated
public class ObserverConfigIT extends ITBaseMini {
- public static class ConfigurableObserver extends AbstractObserver {
+ public static class ConfigurableObserver extends
org.apache.fluo.api.observer.AbstractObserver {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
@@ -101,17 +99,20 @@ public ObservedColumn getObservedColumn() {
@Override
protected void setupObservers(FluoConfiguration fc) {
- List<ObserverSpecification> observers = new ArrayList<>();
+ List<org.apache.fluo.api.config.ObserverSpecification> observers = new
ArrayList<>();
- observers.add(new
ObserverSpecification(ConfigurableObserver.class.getName(),
- newMap("observedCol", "fam1:col1:" + NotificationType.STRONG,
"outputCQ", "col2")));
+ observers.add(
+ new
org.apache.fluo.api.config.ObserverSpecification(ConfigurableObserver.class.getName(),
+ newMap("observedCol", "fam1:col1:" + NotificationType.STRONG,
"outputCQ", "col2")));
- observers.add(new
ObserverSpecification(ConfigurableObserver.class.getName(),
- newMap("observedCol", "fam1:col2:" + NotificationType.STRONG,
"outputCQ", "col3",
- "setWeakNotification", "true")));
+ observers.add(
+ new
org.apache.fluo.api.config.ObserverSpecification(ConfigurableObserver.class.getName(),
+ newMap("observedCol", "fam1:col2:" + NotificationType.STRONG,
"outputCQ", "col3",
+ "setWeakNotification", "true")));
- observers.add(new
ObserverSpecification(ConfigurableObserver.class.getName(),
- newMap("observedCol", "fam1:col3:" + NotificationType.WEAK,
"outputCQ", "col4")));
+ observers.add(
+ new
org.apache.fluo.api.config.ObserverSpecification(ConfigurableObserver.class.getName(),
+ newMap("observedCol", "fam1:col3:" + NotificationType.WEAK,
"outputCQ", "col4")));
fc.addObservers(observers);
}
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
index 489b9e0f..24d7cd45 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -18,7 +18,6 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Consumer;
@@ -26,8 +25,6 @@
import com.google.common.collect.Iterables;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.api.client.Snapshot;
@@ -51,13 +48,6 @@
public class ReadLockFailureIT extends ITBaseImpl {
- private void dumpTable(Consumer<String> out) throws TableNotFoundException {
- Scanner scanner = conn.createScanner(getCurTableName(),
Authorizations.EMPTY);
- for (Entry<Key, Value> entry : scanner) {
- out.accept(FluoFormatter.toString(entry));
- }
- }
-
private Set<String> getDerivedEdges() {
Set<String> derivedEdges = new HashSet<>();
try (Snapshot snap = client.newSnapshot()) {
@@ -98,7 +88,6 @@ private void retryTwice(Consumer<Transaction> retryAction) {
}
}
-
private TransactorNode partiallyCommit(Consumer<TransactionBase> action,
boolean commitPrimary,
boolean closeTransactor) throws Exception {
TransactorNode t2 = new TransactorNode(env);
@@ -223,7 +212,7 @@ private void testParallelScan(boolean closeTransactor)
throws Exception {
}, false, closeTransactor);
retryTwice(tx -> {
- Map<String, Map<Column, String>> ratios = tx.gets(Arrays.asList("user5",
"user6"), crCol);
+ tx.gets(Arrays.asList("user5", "user6"), crCol);
tx.set("user5", crCol, "0.51");
tx.set("user6", crCol, "0.76");
@@ -267,7 +256,6 @@ private void testParallelScanRC(boolean closeTransactor)
throws Exception {
Map<RowColumn, String> ratios = tx.withReadLock()
.gets(Arrays.asList(new RowColumn("user5", crCol), new
RowColumn("user6", crCol)));
-
double cr1 = Double.parseDouble(ratios.get(new RowColumn("user5",
crCol)));
double cr2 = Double.parseDouble(ratios.get(new RowColumn("user6",
crCol)));
@@ -275,8 +263,7 @@ private void testParallelScanRC(boolean closeTransactor)
throws Exception {
}, false, true);
retryTwice(tx -> {
- Map<RowColumn, String> ratios =
- tx.gets(Arrays.asList(new RowColumn("user5", crCol), new
RowColumn("user6", crCol)));
+ tx.gets(Arrays.asList(new RowColumn("user5", crCol), new
RowColumn("user6", crCol)));
tx.set("user5", crCol, "0.51");
tx.set("user6", crCol, "0.76");
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index bd6d5ccc..d08730fe 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -39,8 +39,7 @@
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
/**
- * A simple test that added links between nodes in a graph. There is an
observer that updates an
- * index of node degree.
+ * A simple test that added links between nodes in a graph. There is an
observer that updates an index of node degree.
*/
public class WorkerIT extends ITBaseMini {
@Rule
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
index 1605d2b4..9786fd1f 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -85,7 +85,7 @@ public void testClientWithoutZKSecret() {
FluoConfiguration conf = new
FluoConfiguration(miniFluo.getClientConfiguration());
conf.setZookeeperSecret("");
try (FluoClient client = FluoFactory.newClient(conf)) {
- Assert.fail("Expected client creation to fail.");
+ Assert.fail("Expected client creation to fail. " + client);
} catch (Exception e) {
boolean sawNoAuth = false;
Throwable throwable = e;
@@ -121,7 +121,6 @@ public void testZKAcls() throws Exception {
ZooKeeper zk = getZookeeper();
-
// Verify oracle gc timestamp is visible w/o a password. The GC iterator
that runs in Accumulo
// tablet servers reads this.
String ts = new String(zk.getData(ZookeeperPath.ORACLE_GC_TIMESTAMP,
false, null),
@@ -151,7 +150,6 @@ public void testZKAcls() throws Exception {
} catch (NoAuthException nae) {
}
-
try {
zk.getChildren(path, false);
} catch (NoAuthException nae) {
diff --git
a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 08353106..900592c4 100644
---
a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++
b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -328,7 +328,6 @@ public void testGetMethods() {
tx.commit();
}
-
Logger logger = Logger.getLogger("fluo.tx");
StringWriter writer = new StringWriter();
diff --git a/pom.xml b/pom.xml
index 05b3afe5..ee9021e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<url>https://github.com/apache/fluo/issues</url>
</issueManagement>
<properties>
- <accumulo.version>1.6.5</accumulo.version>
+ <accumulo.version>1.7.3</accumulo.version>
<curator.version>2.7.1</curator.version>
<dropwizard.version>0.8.1</dropwizard.version>
<findbugs.maxRank>11</findbugs.maxRank>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services