closes #768 made observer & app cfg consistent
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/556df6db Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/556df6db Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/556df6db Branch: refs/heads/master Commit: 556df6dbbe5460caa1372e6962301b3415e88eef Parents: 2b1c456 Author: Keith Turner <ke...@deenlo.com> Authored: Thu Sep 15 18:13:17 2016 -0400 Committer: Keith Turner <ke...@deenlo.com> Committed: Thu Sep 15 18:13:17 2016 -0400 ---------------------------------------------------------------------- .../fluo/api/config/FluoConfiguration.java | 28 +++--- .../fluo/api/config/ObserverConfiguration.java | 78 ----------------- .../fluo/api/config/ObserverSpecification.java | 90 ++++++++++++++++++++ .../fluo/api/config/SimpleConfiguration.java | 28 ++++++ .../org/apache/fluo/api/observer/Observer.java | 6 +- .../fluo/api/config/FluoConfigurationTest.java | 37 ++++---- .../apache/fluo/core/client/FluoAdminImpl.java | 30 +++---- .../org/apache/fluo/core/client/Operations.java | 20 ++--- .../org/apache/fluo/core/impl/Environment.java | 20 ++--- .../fluo/core/worker/ObserverContext.java | 16 ++-- .../org/apache/fluo/core/worker/Observers.java | 6 +- .../org/apache/fluo/integration/ITBase.java | 4 +- .../fluo/integration/impl/AppConfigIT.java | 6 +- .../fluo/integration/impl/CollisionIT.java | 6 +- .../apache/fluo/integration/impl/FailureIT.java | 8 +- .../apache/fluo/integration/impl/FluoIT.java | 6 +- .../fluo/integration/impl/NotificationGcIT.java | 6 +- .../fluo/integration/impl/ObserverConfigIT.java | 33 +++---- .../integration/impl/SelfNotificationIT.java | 6 +- .../integration/impl/StrongNotificationIT.java | 6 +- .../integration/impl/WeakNotificationIT.java | 6 +- .../impl/WeakNotificationOverlapIT.java | 6 +- .../apache/fluo/integration/impl/WorkerIT.java | 6 +- .../org/apache/fluo/integration/log/LogIT.java | 8 +- 24 files changed, 254 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java index 562221d..801a025 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java +++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java @@ -113,6 +113,10 @@ public class FluoConfiguration extends SimpleConfiguration { super(propertiesFile); } + public FluoConfiguration(Map<String, String> map) { + super(map); + } + public void validate() { // keep in alphabetical order getAccumuloClasspath(); @@ -126,7 +130,7 @@ public class FluoConfiguration extends SimpleConfiguration { getClientRetryTimeout(); getLoaderQueueSize(); getLoaderThreads(); - getObserverConfig(); + getObserverSpecifications(); getTransactionRollbackTime(); getWorkerThreads(); getZookeeperTimeout(); @@ -275,9 +279,9 @@ public class FluoConfiguration extends SimpleConfiguration { return getPositiveInt(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT); } - public List<ObserverConfiguration> getObserverConfig() { + public List<ObserverSpecification> getObserverSpecifications() { - List<ObserverConfiguration> configList = new ArrayList<>(); + List<ObserverSpecification> configList = new ArrayList<>(); Iterator<String> iter = getKeys(); while (iter.hasNext()) { @@ -298,7 +302,6 @@ public class FluoConfiguration extends SimpleConfiguration { if (className.isEmpty()) { throw new IllegalArgumentException(key + " has empty class name: " + className); } - ObserverConfiguration observerConfig = new ObserverConfiguration(className); Map<String, String> params = new HashMap<>(); for (int i = 1; i < fields.length; i++) { @@ -313,8 +316,9 @@ public class FluoConfiguration extends SimpleConfiguration { } params.put(kv[0], kv[1]); } - observerConfig.setParameters(params); - configList.add(observerConfig); + + ObserverSpecification observerSpecification = new ObserverSpecification(className, params); + configList.add(observerSpecification); } } return configList; @@ -338,8 +342,8 @@ public class FluoConfiguration extends SimpleConfiguration { return max + 1; } - private void addObserver(ObserverConfiguration oconf, int next) { - Map<String, String> params = oconf.getParameters(); + private void addObserver(ObserverSpecification oconf, int next) { + Map<String, String> params = oconf.getConfiguration().toMap(); StringBuilder paramString = new StringBuilder(); for (java.util.Map.Entry<String, String> pentry : params.entrySet()) { paramString.append(','); @@ -351,10 +355,10 @@ public class FluoConfiguration extends SimpleConfiguration { } /** - * Adds an {@link ObserverConfiguration} to the configuration using a unique integer prefix thats + * Adds an {@link ObserverSpecification} to the configuration using a unique integer prefix thats * not currently in use. */ - public FluoConfiguration addObserver(ObserverConfiguration oconf) { + public FluoConfiguration addObserver(ObserverSpecification oconf) { int next = getNextObserverId(); addObserver(oconf, next); return this; @@ -363,9 +367,9 @@ public class FluoConfiguration extends SimpleConfiguration { /** * Adds multiple observers using unique integer prefixes for each. */ - public FluoConfiguration addObservers(Iterable<ObserverConfiguration> observers) { + public FluoConfiguration addObservers(Iterable<ObserverSpecification> observers) { int next = getNextObserverId(); - for (ObserverConfiguration oconf : observers) { + for (ObserverSpecification oconf : observers) { addObserver(oconf, next++); } return this; http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/api/src/main/java/org/apache/fluo/api/config/ObserverConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverConfiguration.java deleted file mode 100644 index 9c2efb9..0000000 --- a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverConfiguration.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.api.config; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.fluo.api.observer.AbstractObserver; -import org.apache.fluo.api.observer.Observer; - -/** - * Used to pass configuration to an {@link AbstractObserver}. Set using - * {@link FluoConfiguration#addObserver(ObserverConfiguration)} - * - * @since 1.0.0 - */ -public class ObserverConfiguration { - private final String className; - private Map<String, String> params = Collections.emptyMap(); - - public ObserverConfiguration(String className) { - this.className = className; - } - - public String getClassName() { - return className; - } - - /** - * For configuration that is the same across multiple observers consider using Application - * configuration. - * - * @param params Parameters that should be passed to - * {@link Observer#init(org.apache.fluo.api.observer.Observer.Context)} - * - * @see FluoConfiguration#getAppConfiguration() - */ - public ObserverConfiguration setParameters(Map<String, String> params) { - if (params == null) { - throw new IllegalArgumentException(); - } - this.params = new HashMap<>(params); - return this; - } - - public Map<String, String> getParameters() { - return Collections.unmodifiableMap(params); - } - - @Override - public int hashCode() { - return className.hashCode() + 17 * params.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof ObserverConfiguration) { - ObserverConfiguration ooc = (ObserverConfiguration) o; - return className.equals(ooc.className) && params.equals(ooc.params); - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java new file mode 100644 index 0000000..76829c9 --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.config; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.apache.fluo.api.observer.Observer; + +/** + * This class encapsulates the information needed to setup an Observer. This class is used by + * {@link FluoConfiguration#addObserver(ObserverSpecification)}. + * + * @since 1.0.0 + */ +public class ObserverSpecification { + private final String className; + private final Map<String, String> configMap; + private SimpleConfiguration config = null; + + /** + * @param className The name of a class that implements {@link Observer} + */ + public ObserverSpecification(String className) { + this.className = className; + this.configMap = Collections.emptyMap(); + } + + /** + * @param className The name of a class that implements {@link Observer} + * @param observerConfig Per observer configuration thats specific to this observer. For + * configuration thats the same across multiple observers, consider using + * {@link FluoConfiguration#getAppConfiguration()} + */ + public ObserverSpecification(String className, SimpleConfiguration observerConfig) { + this.className = className; + this.configMap = observerConfig.toMap(); + } + + /** + * @param className The name of a class that implements {@link Observer} + * @param observerConfig Per observer configuration thats specific to this observer. For + * configuration thats the same across multiple observers, consider using + * {@link FluoConfiguration#getAppConfiguration()} + */ + public ObserverSpecification(String className, Map<String, String> observerConfig) { + this.className = className; + this.configMap = ImmutableMap.copyOf(observerConfig); + } + + public String getClassName() { + return className; + } + + public SimpleConfiguration getConfiguration() { + if (config == null) { + config = new SimpleConfiguration(configMap); + } + return config; + } + + @Override + public int hashCode() { + return className.hashCode() + 17 * configMap.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ObserverSpecification) { + ObserverSpecification ooc = (ObserverSpecification) o; + return className.equals(ooc.className) && configMap.equals(ooc.configMap); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java index 4492679..a92dc1d 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java +++ b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java @@ -18,8 +18,14 @@ package org.apache.fluo.api.config; import java.io.File; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -96,6 +102,13 @@ public class SimpleConfiguration { } } + public SimpleConfiguration(Map<String, String> map) { + this(); + for (Entry<String, String> entry : map.entrySet()) { + internalConfig.setProperty(entry.getKey(), entry.getValue()); + } + } + public void clear() { internalConfig.clear(); } @@ -211,4 +224,19 @@ public class SimpleConfiguration { public String toString() { return ConfigurationUtils.toString(internalConfig); } + + /** + * @return An immutable copy of this configurations as a map. Changes to this after toMap() is + * called will not be reflected in the map. + */ + public Map<String, String> toMap() { + Builder<String, String> builder = ImmutableMap.builder(); + Iterator<String> ki = getKeys(); + while (ki.hasNext()) { + String k = (String) ki.next(); + builder.put(k, getRawString(k)); + } + + return builder.build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java index 21f6b50..6835b99 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java +++ b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java @@ -15,8 +15,6 @@ package org.apache.fluo.api.observer; -import java.util.Map; - import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.config.SimpleConfiguration; @@ -75,9 +73,9 @@ public interface Observer { SimpleConfiguration getAppConfiguration(); /** - * @return The parameters configured for this observer + * @return The per observer configuration that's specific to this observer. */ - Map<String, String> getParameters(); + SimpleConfiguration getObserverConfiguration(); } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java index 8f708f5..f7d7c97 100644 --- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java +++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java @@ -208,7 +208,7 @@ public class FluoConfigurationTest { FluoConfiguration config = new FluoConfiguration(); try { config.setProperty(FluoConfiguration.OBSERVER_PREFIX + "1", value); - config.getObserverConfig(); + config.getObserverSpecifications(); Assert.fail(); } catch (IllegalArgumentException e) { } @@ -219,12 +219,12 @@ public class FluoConfigurationTest { FluoConfiguration config = new FluoConfiguration(); config.setProperty(FluoConfiguration.OBSERVER_PREFIX + "1", "com.foo.Observer2,configKey1=configVal1,configKey2=configVal2"); - List<ObserverConfiguration> ocList = config.getObserverConfig(); + List<ObserverSpecification> ocList = config.getObserverSpecifications(); Assert.assertEquals(1, ocList.size()); Assert.assertEquals("com.foo.Observer2", ocList.get(0).getClassName()); - Assert.assertEquals("configVal1", ocList.get(0).getParameters().get("configKey1")); - Assert.assertEquals("configVal2", ocList.get(0).getParameters().get("configKey2")); - Assert.assertEquals(2, ocList.get(0).getParameters().size()); + Assert.assertEquals("configVal1", ocList.get(0).getConfiguration().getString("configKey1")); + Assert.assertEquals("configVal2", ocList.get(0).getConfiguration().getString("configKey2")); + Assert.assertEquals(2, ocList.get(0).getConfiguration().toMap().size()); assertIAE("class,bad,input"); assertIAE("index,check,,phrasecount.PhraseCounter"); assertIAE(""); @@ -236,43 +236,46 @@ public class FluoConfigurationTest { config = new FluoConfiguration(); config.setProperty(FluoConfiguration.OBSERVER_PREFIX + "1", "Class,"); - ocList = config.getObserverConfig(); + ocList = config.getObserverSpecifications(); Assert.assertEquals(1, ocList.size()); Assert.assertEquals("Class", ocList.get(0).getClassName()); - Assert.assertEquals(0, ocList.get(0).getParameters().size()); + Assert.assertEquals(0, ocList.get(0).getConfiguration().toMap().size()); } @Test public void testObserverConfig2() { FluoConfiguration config = new FluoConfiguration(); - ObserverConfiguration oc1 = - new ObserverConfiguration("foo.class1").setParameters(ImmutableMap.of("param1", "a")); - ObserverConfiguration oc2 = - new ObserverConfiguration("foo.class2").setParameters(ImmutableMap.of("param1", "b")); - ObserverConfiguration oc3 = new ObserverConfiguration("foo.class3"); + ObserverSpecification oc1 = + new ObserverSpecification("foo.class1", ImmutableMap.of("param1", "a")); + ObserverSpecification oc2 = + new ObserverSpecification("foo.class2", ImmutableMap.of("param1", "b")); + ObserverSpecification oc3 = new ObserverSpecification("foo.class3"); config.addObserver(oc1); config.addObserver(oc2); config.addObserver(oc3); - Assert.assertEquals(ImmutableSet.of(oc1, oc2, oc3), new HashSet<>(config.getObserverConfig())); + Assert.assertEquals(ImmutableSet.of(oc1, oc2, oc3), + new HashSet<>(config.getObserverSpecifications())); config.clearObservers(); - Assert.assertEquals(0, config.getObserverConfig().size()); + Assert.assertEquals(0, config.getObserverSpecifications().size()); config.addObservers(Arrays.asList(oc1, oc2)); - Assert.assertEquals(ImmutableSet.of(oc1, oc2), new HashSet<>(config.getObserverConfig())); + Assert.assertEquals(ImmutableSet.of(oc1, oc2), + new HashSet<>(config.getObserverSpecifications())); config.addObserver(oc3); - Assert.assertEquals(ImmutableSet.of(oc1, oc2, oc3), new HashSet<>(config.getObserverConfig())); + Assert.assertEquals(ImmutableSet.of(oc1, oc2, oc3), + new HashSet<>(config.getObserverSpecifications())); config.clearObservers(); - Assert.assertEquals(0, config.getObserverConfig().size()); + Assert.assertEquals(0, config.getObserverSpecifications().size()); } private void assertSetNameIAE(String name) { http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java index aa6a217..8d059a5 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java @@ -38,7 +38,8 @@ import org.apache.fluo.accumulo.util.ZookeeperPath; import org.apache.fluo.accumulo.util.ZookeeperUtil; import org.apache.fluo.api.client.FluoAdmin; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.exceptions.FluoException; import org.apache.fluo.api.observer.Observer; @@ -209,38 +210,37 @@ public class FluoAdminImpl implements FluoAdmin { logger.info("Setting up observers using app config: {}", config.getAppConfiguration()); - Map<Column, ObserverConfiguration> colObservers = new HashMap<>(); - Map<Column, ObserverConfiguration> weakObservers = new HashMap<>(); - for (ObserverConfiguration observerConfig : config.getObserverConfig()) { + Map<Column, ObserverSpecification> colObservers = new HashMap<>(); + Map<Column, ObserverSpecification> weakObservers = new HashMap<>(); + for (ObserverSpecification ospec : config.getObserverSpecifications()) { Observer observer; try { - observer = - Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).newInstance(); + observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance(); } catch (ClassNotFoundException e1) { - throw new FluoException("Observer class '" + observerConfig.getClassName() + "' was not " + throw new FluoException("Observer class '" + ospec.getClassName() + "' was not " + "found. Check for class name misspellings or failure to include " + "the observer jar.", e1); } catch (InstantiationException | IllegalAccessException e2) { - throw new FluoException("Observer class '" + observerConfig.getClassName() + throw new FluoException("Observer class '" + ospec.getClassName() + "' could not be created.", e2); } + SimpleConfiguration oc = ospec.getConfiguration(); logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(), - observerConfig.getParameters()); + oc.toMap()); try { - observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), - observerConfig.getParameters())); + observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc)); } catch (Exception e) { - throw new FluoException("Observer '" + observerConfig.getClassName() - + "' could not be initialized", e); + throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized", + e); } ObservedColumn observedCol = observer.getObservedColumn(); if (observedCol.getType() == NotificationType.STRONG) { - colObservers.put(observedCol.getColumn(), observerConfig); + colObservers.put(observedCol.getColumn(), ospec); } else { - weakObservers.put(observedCol.getColumn(), observerConfig); + weakObservers.put(observedCol.getColumn(), ospec); } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java b/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java index 92a6481..16851a6 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java +++ b/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java @@ -25,7 +25,7 @@ import java.util.Set; import org.apache.curator.framework.CuratorFramework; import org.apache.fluo.accumulo.util.ZookeeperPath; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Column; import org.apache.fluo.core.util.ColumnUtil; import org.apache.fluo.core.util.CuratorUtil; @@ -56,8 +56,8 @@ public class Operations { } public static void updateObservers(CuratorFramework curator, - Map<Column, ObserverConfiguration> colObservers, - Map<Column, ObserverConfiguration> weakObservers) throws Exception { + Map<Column, ObserverSpecification> colObservers, + Map<Column, ObserverSpecification> weakObservers) throws Exception { // TODO check that no workers are running... or make workers watch this znode @@ -78,27 +78,27 @@ public class Operations { } private static void serializeObservers(DataOutputStream dos, - Map<Column, ObserverConfiguration> colObservers) throws IOException { + Map<Column, ObserverSpecification> colObservers) throws IOException { // TODO use a human readable serialized format like json - Set<Entry<Column, ObserverConfiguration>> es = colObservers.entrySet(); + Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet(); WritableUtils.writeVInt(dos, colObservers.size()); - for (Entry<Column, ObserverConfiguration> entry : es) { + for (Entry<Column, ObserverSpecification> entry : es) { ColumnUtil.writeColumn(entry.getKey(), dos); dos.writeUTF(entry.getValue().getClassName()); - Map<String, String> params = entry.getValue().getParameters(); + Map<String, String> params = entry.getValue().getConfiguration().toMap(); WritableUtils.writeVInt(dos, params.size()); - for (Entry<String, String> pentry : entry.getValue().getParameters().entrySet()) { + for (Entry<String, String> pentry : params.entrySet()) { dos.writeUTF(pentry.getKey()); dos.writeUTF(pentry.getValue()); } } } - private static byte[] serializeObservers(Map<Column, ObserverConfiguration> colObservers, - Map<Column, ObserverConfiguration> weakObservers) throws IOException { + private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers, + Map<Column, ObserverSpecification> weakObservers) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (DataOutputStream dos = new DataOutputStream(baos)) { serializeObservers(dos, colObservers); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java index 94dfcff..e1bf8fc 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java @@ -35,7 +35,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.curator.framework.CuratorFramework; import org.apache.fluo.accumulo.util.ZookeeperPath; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.data.Column; import org.apache.fluo.core.metrics.MetricNames; @@ -52,8 +52,8 @@ public class Environment implements AutoCloseable { private String table; private Authorizations auths = new Authorizations(); private String accumuloInstance; - private Map<Column, ObserverConfiguration> observers; - private Map<Column, ObserverConfiguration> weakObservers; + private Map<Column, ObserverSpecification> observers; + private Map<Column, ObserverSpecification> weakObservers; private Set<Column> allObserversColumns; private Connector conn; private String accumuloInstanceID; @@ -161,10 +161,10 @@ public class Environment implements AutoCloseable { } } - private static Map<Column, ObserverConfiguration> readObservers(DataInputStream dis) + private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis) throws IOException { - HashMap<Column, ObserverConfiguration> omap = new HashMap<>(); + HashMap<Column, ObserverSpecification> omap = new HashMap<>(); int num = WritableUtils.readVInt(dis); for (int i = 0; i < num; i++) { @@ -178,10 +178,8 @@ public class Environment implements AutoCloseable { params.put(k, v); } - ObserverConfiguration observerConfig = new ObserverConfiguration(clazz); - observerConfig.setParameters(params); - - omap.put(col, observerConfig); + ObserverSpecification ospec = new ObserverSpecification(clazz, params); + omap.put(col, ospec); } return omap; @@ -215,11 +213,11 @@ public class Environment implements AutoCloseable { return fluoApplicationID; } - public Map<Column, ObserverConfiguration> getObservers() { + public Map<Column, ObserverSpecification> getObservers() { return observers; } - public Map<Column, ObserverConfiguration> getWeakObservers() { + public Map<Column, ObserverSpecification> getWeakObservers() { return weakObservers; } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java index 9d9c5a7..ab0eedc 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java +++ b/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java @@ -15,28 +15,26 @@ package org.apache.fluo.core.worker; -import java.util.Map; - import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.observer.Observer; import org.apache.fluo.core.impl.Environment; public class ObserverContext implements Observer.Context { - private final Map<String, String> params; + private final SimpleConfiguration observerConfig; private final SimpleConfiguration appConfig; private final Environment env; - public ObserverContext(SimpleConfiguration appConfig, Map<String, String> params) { + public ObserverContext(SimpleConfiguration appConfig, SimpleConfiguration observerConfig) { this.appConfig = appConfig; - this.params = params; + this.observerConfig = observerConfig; this.env = null; } - public ObserverContext(Environment env, Map<String, String> parameters) { + public ObserverContext(Environment env, SimpleConfiguration observerConfig) { this.env = env; this.appConfig = null; - this.params = parameters; + this.observerConfig = observerConfig; } @Override @@ -48,8 +46,8 @@ public class ObserverContext implements Observer.Context { } @Override - public Map<String, String> getParameters() { - return params; + public SimpleConfiguration getObserverConfiguration() { + return observerConfig; } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java index 9b0dd28..285a69a 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java +++ b/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java @@ -20,7 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.Observer; import org.apache.fluo.core.impl.Environment; @@ -63,7 +63,7 @@ public class Observers implements AutoCloseable { Observer observer = null; - ObserverConfiguration observerConfig = env.getObservers().get(col); + ObserverSpecification observerConfig = env.getObservers().get(col); if (observerConfig == null) { observerConfig = env.getWeakObservers().get(col); } @@ -72,7 +72,7 @@ public class Observers implements AutoCloseable { try { observer = Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).newInstance(); - observer.init(new ObserverContext(env, observerConfig.getParameters())); + observer.init(new ObserverContext(env, observerConfig.getConfiguration())); } catch (RuntimeException e) { throw e; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java index 5e2a7f9..8f6b425 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java @@ -30,7 +30,7 @@ import org.apache.commons.io.FileUtils; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.RowColumnValue; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -80,7 +80,7 @@ public class ITBase { conn = miniAccumulo.getConnector(USER, new PasswordToken(PASSWORD)); } - protected List<ObserverConfiguration> getObservers() { + protected List<ObserverSpecification> getObservers() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java index 74f40e4..22f8ea2 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java @@ -25,7 +25,7 @@ import org.apache.fluo.api.client.Loader; import org.apache.fluo.api.client.LoaderExecutor; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; @@ -42,8 +42,8 @@ public class AppConfigIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(TestObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(TestObserver.class.getName())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java index 4af93d1..8639fbc 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java @@ -32,7 +32,7 @@ import org.apache.fluo.api.client.LoaderExecutor; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; @@ -95,8 +95,8 @@ public class CollisionIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(TotalObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java index 38a857e..6b4d279 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java @@ -32,7 +32,7 @@ import org.apache.fluo.accumulo.util.LongUtil; import org.apache.fluo.accumulo.util.ZookeeperUtil; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.exceptions.CommitException; @@ -73,9 +73,9 @@ public class FailureIT extends ITBaseImpl { } @Override - protected List<ObserverConfiguration> getObservers() { - List<ObserverConfiguration> observed = new ArrayList<>(); - observed.add(new ObserverConfiguration(NullObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + List<ObserverSpecification> observed = new ArrayList<>(); + observed.add(new ObserverSpecification(NullObserver.class.getName())); return observed; } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java index 086f1d9..e67d4d9 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java @@ -31,7 +31,7 @@ import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumnValue; @@ -66,8 +66,8 @@ public class FluoIT extends ITBaseImpl { } @Override - protected List<org.apache.fluo.api.config.ObserverConfiguration> getObservers() { - return Arrays.asList(new ObserverConfiguration(BalanceObserver.class.getName())); + protected List<org.apache.fluo.api.config.ObserverSpecification> getObservers() { + return Arrays.asList(new ObserverSpecification(BalanceObserver.class.getName())); }; @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java index 190839f..b496955 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java @@ -24,7 +24,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.fluo.accumulo.util.ColumnConstants; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Column; import org.apache.fluo.core.impl.Environment; import org.apache.fluo.core.impl.Notification; @@ -60,8 +60,8 @@ public class NotificationGcIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(SimpleObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java index 9766f9b..c97d000 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java @@ -23,7 +23,8 @@ import java.util.Map; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; @@ -42,13 +43,15 @@ public class ObserverConfigIT extends ITBaseMini { @Override public void init(Context context) { - String ocTokens[] = context.getParameters().get("observedCol").split(":"); + SimpleConfiguration myConfig = context.getObserverConfiguration(); + + String ocTokens[] = myConfig.getString("observedCol").split(":"); observedColumn = new ObservedColumn(new Column(ocTokens[0], ocTokens[1]), NotificationType.valueOf(ocTokens[2])); - outputCQ = Bytes.of(context.getParameters().get("outputCQ")); - String swn = context.getParameters().get("setWeakNotification"); - if (swn != null && swn.equals("true")) { + outputCQ = Bytes.of(myConfig.getString("outputCQ")); + String swn = myConfig.getString("setWeakNotification", "false"); + if (swn.equals("true")) { setWeakNotification = true; } } @@ -83,20 +86,18 @@ public class ObserverConfigIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - List<ObserverConfiguration> observers = new ArrayList<>(); + protected List<ObserverSpecification> getObservers() { + List<ObserverSpecification> observers = new ArrayList<>(); - observers.add(new ObserverConfiguration(ConfigurableObserver.class.getName()) - .setParameters(newMap("observedCol", "fam1:col1:" + NotificationType.STRONG, "outputCQ", - "col2"))); + observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap( + "observedCol", "fam1:col1:" + NotificationType.STRONG, "outputCQ", "col2"))); - observers.add(new ObserverConfiguration(ConfigurableObserver.class.getName()) - .setParameters(newMap("observedCol", "fam1:col2:" + NotificationType.STRONG, "outputCQ", - "col3", "setWeakNotification", "true"))); + observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap( + "observedCol", "fam1:col2:" + NotificationType.STRONG, "outputCQ", "col3", + "setWeakNotification", "true"))); - observers.add(new ObserverConfiguration(ConfigurableObserver.class.getName()) - .setParameters(newMap("observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", - "col4"))); + observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap( + "observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", "col4"))); return observers; } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java index 4015339..fe4b0d6 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; @@ -40,8 +40,8 @@ public class SelfNotificationIT extends ITBaseMini { private static final Column EXPORT_COUNT_COL = new Column("export", "count"); @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(ExportingObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(ExportingObserver.class.getName())); } private static List<String> exports = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java index 702d1b1..1d065e1 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java @@ -20,7 +20,7 @@ import java.util.List; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; @@ -54,8 +54,8 @@ public class StrongNotificationIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(SimpleObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java index de9ab41..22c6632 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumnValue; @@ -69,8 +69,8 @@ public class WeakNotificationIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(SimpleObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java index f424398..13bb7a7 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; @@ -69,8 +69,8 @@ public class WeakNotificationOverlapIT extends ITBaseImpl { @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(TotalObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java index d1d4522..2406b82 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java @@ -22,7 +22,7 @@ import com.google.common.collect.Iterables; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; @@ -51,8 +51,8 @@ public class WorkerIT extends ITBaseMini { private static Column observedColumn = LAST_UPDATE; @Override - protected List<ObserverConfiguration> getObservers() { - return Collections.singletonList(new ObserverConfiguration(DegreeIndexer.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Collections.singletonList(new ObserverSpecification(DegreeIndexer.class.getName())); } public static class DegreeIndexer implements Observer { http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/556df6db/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java index bb8ed69..c2167bc 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java @@ -25,7 +25,7 @@ import org.apache.fluo.api.client.LoaderExecutor; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ObserverConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; @@ -119,9 +119,9 @@ public class LogIT extends ITBaseMini { } @Override - protected List<ObserverConfiguration> getObservers() { - return Arrays.asList(new ObserverConfiguration(TestObserver.class.getName()), - new ObserverConfiguration(BinaryObserver.class.getName())); + protected List<ObserverSpecification> getObservers() { + return Arrays.asList(new ObserverSpecification(TestObserver.class.getName()), + new ObserverSpecification(BinaryObserver.class.getName())); } @Test