Repository: accumulo Updated Branches: refs/heads/master 97ce2a758 -> 4eadac16b
ACCUMULO-4066 Speed up condition checking for conditional mutations Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/21d2f615 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/21d2f615 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/21d2f615 Branch: refs/heads/master Commit: 21d2f61524de1f26ed88c4baaddee70c01081196 Parents: ff1e003 Author: Keith Turner <ktur...@apache.org> Authored: Wed Jan 27 14:13:40 2016 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Jan 27 19:32:29 2016 -0500 ---------------------------------------------------------------------- .../core/client/impl/CompressedIterators.java | 14 +- .../core/client/impl/ConditionalWriterImpl.java | 44 ++++- .../accumulo/core/iterators/IteratorUtil.java | 93 ++++++----- .../client/impl/ConditionalComparatorTest.java | 53 ++++++ .../tserver/ConditionCheckerContext.java | 164 +++++++++++++++++++ .../org/apache/accumulo/tserver/Tablet.java | 37 ++++- .../apache/accumulo/tserver/TabletServer.java | 95 ++++------- .../accumulo/test/ConditionalWriterIT.java | 137 +++++++++++++++- 8 files changed, 521 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java b/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java index 3fcce90..96d58a7 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java @@ -24,15 +24,12 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ArrayByteSequence; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.util.UnsynchronizedBuffer; public class CompressedIterators { private Map<String,Integer> symbolMap; private List<String> symbolTable; - private Map<ByteSequence,IterConfig> cache; public static class IterConfig { public List<IterInfo> ssiList = new ArrayList<IterInfo>(); @@ -46,7 +43,6 @@ public class CompressedIterators { public CompressedIterators(List<String> symbols) { this.symbolTable = symbols; - this.cache = new HashMap<ByteSequence,IterConfig>(); } private int getSymbolID(String symbol) { @@ -85,14 +81,7 @@ public class CompressedIterators { } public IterConfig decompress(ByteBuffer iterators) { - - ByteSequence iterKey = new ArrayByteSequence(iterators); - IterConfig config = cache.get(iterKey); - if (config != null) { - return config; - } - - config = new IterConfig(); + IterConfig config = new IterConfig(); UnsynchronizedBuffer.Reader in = new UnsynchronizedBuffer.Reader(iterators); @@ -120,7 +109,6 @@ public class CompressedIterators { } - cache.put(iterKey, config); return config; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 730cf08..9030d77 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -19,7 +19,9 @@ package org.apache.accumulo.core.client.impl; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -70,6 +72,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.util.BadArgumentException; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; @@ -375,7 +378,7 @@ class ConditionalWriterImpl implements ConditionalWriter { this.credentials = credentials; this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); - this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads()); + this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new NamingThreadFactory(this.getClass().getSimpleName())); this.locator = TabletLocator.getLocator(instance, new Text(tableId)); this.serverQueues = new HashMap<String,ServerQueue>(); this.tableId = tableId; @@ -741,10 +744,47 @@ class ConditionalWriterImpl implements ConditionalWriter { } } + static class ConditionComparator implements Comparator<Condition> { + + private static final Long MAX = Long.valueOf(Long.MAX_VALUE); + + @Override + public int compare(Condition c1, Condition c2) { + int comp = c1.getFamily().compareTo(c2.getFamily()); + if (comp == 0) { + comp = c1.getQualifier().compareTo(c2.getQualifier()); + if (comp == 0) { + comp = c1.getVisibility().compareTo(c2.getVisibility()); + if (comp == 0) { + Long l1 = c1.getTimestamp(); + Long l2 = c2.getTimestamp(); + if (l1 == null) { + l1 = MAX; + } + + if (l2 == null) { + l2 = MAX; + } + + comp = l2.compareTo(l1); + } + } + } + + return comp; + } + } + + private static final ConditionComparator CONDITION_COMPARATOR = new ConditionComparator(); + private List<TCondition> convertConditions(ConditionalMutation cm, CompressedIterators compressedIters) { List<TCondition> conditions = new ArrayList<TCondition>(cm.getConditions().size()); - for (Condition cond : cm.getConditions()) { + // sort conditions inorder to get better lookup performance. Sort on client side so tserver does not have to do it. + Condition[] ca = cm.getConditions().toArray(new Condition[cm.getConditions().size()]); + Arrays.sort(ca, CONDITION_COMPARATOR); + + for (Condition cond : ca) { long ts = 0; boolean hasTs = false; http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java index 98392f6..6f76d77 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.TreeMap; import org.apache.accumulo.core.client.IteratorSetting; @@ -108,21 +109,26 @@ public class IteratorUtil { return props; } - public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration conf) { - List<IterInfo> iters = new ArrayList<IterInfo>(); - parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), conf); + public static void mergeIteratorConfig(List<IterInfo> destList, Map<String,Map<String,String>> destOpts, List<IterInfo> tableIters, + Map<String,Map<String,String>> tableOpts, List<IterInfo> ssi, Map<String,Map<String,String>> ssio) { + destList.addAll(tableIters); + destList.addAll(ssi); + Collections.sort(destList, new IterInfoComparator()); - int max = 0; - - for (IterInfo iterInfo : iters) { - if (iterInfo.priority > max) - max = iterInfo.priority; + Set<Entry<String,Map<String,String>>> es = tableOpts.entrySet(); + for (Entry<String,Map<String,String>> entry : es) { + if (entry.getValue() == null) { + destOpts.put(entry.getKey(), null); + } else { + destOpts.put(entry.getKey(), new HashMap<String,String>(entry.getValue())); + } } - return max; + IteratorUtil.mergeOptions(ssio, destOpts); + } - protected static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) { + public static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) { final Property scopeProperty = IteratorScope.getProperty(scope); final String scopePropertyKey = scopeProperty.getKey(); @@ -155,24 +161,6 @@ public class IteratorUtil { Collections.sort(iters, new IterInfoComparator()); } - public static String findIterator(IteratorScope scope, String className, AccumuloConfiguration conf, Map<String,String> opts) { - ArrayList<IterInfo> iters = new ArrayList<IterInfo>(); - Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>(); - - parseIterConf(scope, iters, allOptions, conf); - - for (IterInfo iterInfo : iters) - if (iterInfo.className.equals(className)) { - Map<String,String> tmpOpts = allOptions.get(iterInfo.iterName); - if (tmpOpts != null) { - opts.putAll(tmpOpts); - } - return iterInfo.iterName; - } - - return null; - } - public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope, SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, IteratorEnvironment env) throws IOException { List<IterInfo> emptyList = Collections.emptyList(); @@ -209,6 +197,12 @@ public class IteratorUtil { parseIterConf(scope, iters, allOptions, conf); + mergeOptions(ssio, allOptions); + + return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH)); + } + + private static void mergeOptions(Map<String,Map<String,String>> ssio, Map<String,Map<String,String>> allOptions) { for (Entry<String,Map<String,String>> entry : ssio.entrySet()) { if (entry.getValue() == null) continue; @@ -219,30 +213,35 @@ public class IteratorUtil { options.putAll(entry.getValue()); } } - - return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH)); } - @SuppressWarnings("unchecked") public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source, Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context) throws IOException { + return loadIterators(source, iters, iterOpts, env, useAccumuloClassLoader, context, null); + } + + public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source, + Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context, + Map<String,Class<? extends SortedKeyValueIterator<K,V>>> classCache) throws IOException { // wrap the source in a SynchronizedIterator in case any of the additional configured iterators want to use threading SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<K,V>(source); try { for (IterInfo iterInfo : iters) { - Class<? extends SortedKeyValueIterator<K,V>> clazz; - if (useAccumuloClassLoader) { - if (context != null && !context.equals("")) - clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.getContextManager().loadClass(context, iterInfo.className, - SortedKeyValueIterator.class); - else - clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class); + Class<? extends SortedKeyValueIterator<K,V>> clazz = null; + if (classCache != null) { + clazz = classCache.get(iterInfo.className); + + if (clazz == null) { + clazz = loadClass(useAccumuloClassLoader, context, iterInfo); + classCache.put(iterInfo.className, clazz); + } } else { - clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class); + clazz = loadClass(useAccumuloClassLoader, context, iterInfo); } + SortedKeyValueIterator<K,V> skvi = clazz.newInstance(); Map<String,String> options = iterOpts.get(iterInfo.iterName); @@ -266,6 +265,22 @@ public class IteratorUtil { return prev; } + @SuppressWarnings("unchecked") + private static <K extends WritableComparable<?>,V extends Writable> Class<? extends SortedKeyValueIterator<K,V>> loadClass(boolean useAccumuloClassLoader, + String context, IterInfo iterInfo) throws ClassNotFoundException, IOException { + Class<? extends SortedKeyValueIterator<K,V>> clazz; + if (useAccumuloClassLoader) { + if (context != null && !context.equals("")) + clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.getContextManager().loadClass(context, iterInfo.className, + SortedKeyValueIterator.class); + else + clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class); + } else { + clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class); + } + return clazz; + } + public static Range maximizeStartKeyTimeStamp(Range range) { Range seekRange = range; http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java new file mode 100644 index 0000000..5413893 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.impl; + +import org.apache.accumulo.core.client.impl.ConditionalWriterImpl.ConditionComparator; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.junit.Assert; +import org.junit.Test; + +public class ConditionalComparatorTest { + @Test + public void testComparator() { + Condition c1 = new Condition("a", "b"); + Condition c2 = new Condition("a", "c"); + Condition c3 = new Condition("b", "c"); + Condition c4 = new Condition("a", "b").setTimestamp(5); + Condition c5 = new Condition("a", "b").setTimestamp(6); + Condition c6 = new Condition("a", "b").setVisibility(new ColumnVisibility("A&B")); + Condition c7 = new Condition("a", "b").setVisibility(new ColumnVisibility("A&C")); + + ConditionComparator comparator = new ConditionComparator(); + + Assert.assertTrue(comparator.compare(c1, c1) == 0); + Assert.assertTrue(comparator.compare(c1, c2) < 0); + Assert.assertTrue(comparator.compare(c2, c1) > 0); + Assert.assertTrue(comparator.compare(c1, c3) < 0); + Assert.assertTrue(comparator.compare(c3, c1) > 0); + Assert.assertTrue(comparator.compare(c1, c4) < 0); + Assert.assertTrue(comparator.compare(c4, c1) > 0); + Assert.assertTrue(comparator.compare(c5, c4) < 0); + Assert.assertTrue(comparator.compare(c4, c5) > 0); + Assert.assertTrue(comparator.compare(c1, c7) < 0); + Assert.assertTrue(comparator.compare(c7, c1) > 0); + Assert.assertTrue(comparator.compare(c6, c7) < 0); + Assert.assertTrue(comparator.compare(c7, c6) > 0); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java new file mode 100644 index 0000000..2e34f38 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.tserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.impl.CompressedIterators; +import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.TCMResult; +import org.apache.accumulo.core.data.thrift.TCMStatus; +import org.apache.accumulo.core.data.thrift.TCondition; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.tserver.data.ServerConditionalMutation; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +public class ConditionCheckerContext { + private CompressedIterators compressedIters; + + private List<IterInfo> tableIters; + private Map<String,Map<String,String>> tableIterOpts; + private TabletIteratorEnvironment tie; + private String context; + private Map<String,Class<? extends SortedKeyValueIterator<Key,Value>>> classCache; + + private static class MergedIterConfig { + List<IterInfo> mergedIters; + Map<String,Map<String,String>> mergedItersOpts; + + MergedIterConfig(List<IterInfo> mergedIters, Map<String,Map<String,String>> mergedItersOpts) { + this.mergedIters = mergedIters; + this.mergedItersOpts = mergedItersOpts; + } + } + + private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<ByteSequence,MergedIterConfig>(); + + ConditionCheckerContext(CompressedIterators compressedIters, AccumuloConfiguration tableConf) { + this.compressedIters = compressedIters; + + tableIters = new ArrayList<IterInfo>(); + tableIterOpts = new HashMap<String,Map<String,String>>(); + + // parse table iterator config once + IteratorUtil.parseIterConf(IteratorScope.scan, tableIters, tableIterOpts, tableConf); + + context = tableConf.get(Property.TABLE_CLASSPATH); + + classCache = new HashMap<String,Class<? extends SortedKeyValueIterator<Key,Value>>>(); + + tie = new TabletIteratorEnvironment(IteratorScope.scan, tableConf); + } + + SortedKeyValueIterator<Key,Value> buildIterator(SortedKeyValueIterator<Key,Value> systemIter, TCondition tc) throws IOException { + + ArrayByteSequence key = new ArrayByteSequence(tc.iterators); + MergedIterConfig mic = mergedIterCache.get(key); + if (mic == null) { + IterConfig ic = compressedIters.decompress(tc.iterators); + + List<IterInfo> mergedIters = new ArrayList<IterInfo>(tableIters.size() + ic.ssiList.size()); + Map<String,Map<String,String>> mergedItersOpts = new HashMap<String,Map<String,String>>(tableIterOpts.size() + ic.ssio.size()); + + IteratorUtil.mergeIteratorConfig(mergedIters, mergedItersOpts, tableIters, tableIterOpts, ic.ssiList, ic.ssio); + + mic = new MergedIterConfig(mergedIters, mergedItersOpts); + + mergedIterCache.put(key, mic); + } + + return IteratorUtil.loadIterators(systemIter, mic.mergedIters, mic.mergedItersOpts, tie, true, context, classCache); + } + + boolean checkConditions(SortedKeyValueIterator<Key,Value> systemIter, ServerConditionalMutation scm) throws IOException { + boolean add = true; + + for (TCondition tc : scm.getConditions()) { + + Range range; + if (tc.hasTimestamp) + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); + else + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); + + SortedKeyValueIterator<Key,Value> iter = buildIterator(systemIter, tc); + + ByteSequence cf = new ArrayByteSequence(tc.getCf()); + iter.seek(range, Collections.singleton(cf), true); + Value val = null; + if (iter.hasTop()) { + val = iter.getTopValue(); + } + + if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { + add = false; + break; + } + } + return add; + } + + public class ConditionChecker { + + private List<ServerConditionalMutation> conditionsToCheck; + private List<ServerConditionalMutation> okMutations; + private List<TCMResult> results; + private boolean checked = false; + + public ConditionChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) { + this.conditionsToCheck = conditionsToCheck; + this.okMutations = okMutations; + this.results = results; + } + + public void check(SortedKeyValueIterator<Key,Value> systemIter) throws IOException { + Preconditions.checkArgument(!checked, "check() method should only be called once"); + checked = true; + + for (ServerConditionalMutation scm : conditionsToCheck) { + if (checkConditions(systemIter, scm)) { + okMutations.add(scm); + } else { + results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); + } + } + } + } + + public ConditionChecker newChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) { + return new ConditionChecker(conditionsToCheck, okMutations, results); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java index 3f00c0b..ca4bde6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java @@ -128,6 +128,7 @@ import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.tserver.Compactor.CompactionCanceledException; import org.apache.accumulo.tserver.Compactor.CompactionEnv; +import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker; import org.apache.accumulo.tserver.FileManager.ScanFileManager; import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv; @@ -1597,6 +1598,23 @@ public class Tablet { void receive(List<KVEntry> matches) throws IOException; } + public void checkConditions(ConditionChecker checker, Authorizations authorizations, AtomicBoolean iFlag) throws IOException { + + ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, iFlag); + + try { + SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource); + checker.check(iter); + } catch (IOException ioe) { + dataSource.close(true); + throw ioe; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + dataSource.close(false); + } + } + class LookupResult { List<Range> unfinishedRanges = new ArrayList<Range>(); long bytesAdded = 0; @@ -1929,18 +1947,29 @@ public class Tablet { private StatsIterator statsIterator; ScanOptions options; + private final boolean loadIters; ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) { expectedDeletionCount = dataSourceDeletions.get(); this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false); this.interruptFlag = interruptFlag; + this.loadIters = true; } ScanDataSource(ScanOptions options) { expectedDeletionCount = dataSourceDeletions.get(); this.options = options; this.interruptFlag = options.interruptFlag; + this.loadIters = true; + } + + ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, AtomicBoolean iFlag) { + expectedDeletionCount = dataSourceDeletions.get(); + Set<Column> emptycols = Collections.emptySet(); + this.options = new ScanOptions(-1, authorizations, defaultLabels, emptycols, null, null, iFlag, false); + this.interruptFlag = iFlag; + this.loadIters = false; } @Override @@ -2035,8 +2064,12 @@ public class Tablet { VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels); - return iterEnv.getTopLevelIterator(IteratorUtil - .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv)); + if (!loadIters) { + return visFilter; + } else { + return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, + iterEnv)); + } } private void close(boolean sawErrors) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 4e3e00b..6023ae3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -67,7 +67,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.CompressedIterators; -import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; @@ -99,7 +98,6 @@ import org.apache.accumulo.core.data.thrift.ScanResult; import org.apache.accumulo.core.data.thrift.TCMResult; import org.apache.accumulo.core.data.thrift.TCMStatus; import org.apache.accumulo.core.data.thrift.TColumn; -import org.apache.accumulo.core.data.thrift.TCondition; import org.apache.accumulo.core.data.thrift.TConditionalMutation; import org.apache.accumulo.core.data.thrift.TConditionalSession; import org.apache.accumulo.core.data.thrift.TKey; @@ -199,6 +197,7 @@ import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.trace.thrift.TInfo; import org.apache.accumulo.tserver.Compactor.CompactionInfo; +import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker; import org.apache.accumulo.tserver.RowLocks.RowLock; import org.apache.accumulo.tserver.Tablet.CommitSession; import org.apache.accumulo.tserver.Tablet.KVEntry; @@ -1930,79 +1929,57 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu List<String> symbols) throws IOException { Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator(); - CompressedIterators compressedIters = new CompressedIterators(symbols); + final CompressedIterators compressedIters = new CompressedIterators(symbols); + ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, ServerConfiguration.getTableConfiguration(instance, cs.tableId)); while (iter.hasNext()) { - Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next(); - Tablet tablet = onlineTablets.get(entry.getKey()); + final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next(); + final Tablet tablet = onlineTablets.get(entry.getKey()); if (tablet == null || tablet.isClosed()) { for (ServerConditionalMutation scm : entry.getValue()) results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); iter.remove(); } else { - List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size()); + final List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size()); + final List<TCMResult> resultsSubList = results.subList(results.size(), results.size()); - for (ServerConditionalMutation scm : entry.getValue()) { - if (checkCondition(results, cs, compressedIters, tablet, scm)) - okMutations.add(scm); - } - - entry.setValue(okMutations); - } - - } - } - - boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet, - ServerConditionalMutation scm) throws IOException { - boolean add = true; - - Set<Column> emptyCols = Collections.emptySet(); - - for (TCondition tc : scm.getConditions()) { - - Range range; - if (tc.hasTimestamp) - range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); - else - range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); - - IterConfig ic = compressedIters.decompress(tc.iterators); - - Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag); + ConditionChecker checker = checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList); + try { + tablet.checkConditions(checker, cs.auths, cs.interruptFlag); - try { - ScanBatch batch = scanner.read(); + if (okMutations.size() > 0) { + entry.setValue(okMutations); + } else { + iter.remove(); + } + } catch (TabletClosedException e) { + // clear anything added while checking conditions. + resultsSubList.clear(); - Value val = null; + for (ServerConditionalMutation scm : entry.getValue()) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } + iter.remove(); + } catch (IterationInterruptedException e) { + // clear anything added while checking conditions. + resultsSubList.clear(); - for (KVEntry entry2 : batch.results) { - val = entry2.getValue(); - break; - } + for (ServerConditionalMutation scm : entry.getValue()) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } + iter.remove(); + } catch (TooManyFilesException e) { + // clear anything added while checking conditions. + resultsSubList.clear(); - if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { - results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); - add = false; - break; + for (ServerConditionalMutation scm : entry.getValue()) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } + iter.remove(); } - - } catch (TabletClosedException e) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (IterationInterruptedException iie) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (TooManyFilesException tmfe) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; } } - return add; } private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java index 5bfaebf..d32bc43 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -19,9 +19,11 @@ package org.apache.accumulo.test; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -63,7 +65,11 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.LongCombiner.Type; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.security.Authorizations; @@ -90,6 +96,7 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import com.google.common.base.Charsets; import com.google.common.collect.Iterables; /** @@ -501,7 +508,135 @@ public class ConditionalWriterIT extends AccumuloClusterIT { Assert.assertEquals(expected, actual); - // TODO test w/ table that has iterators configured + cw.close(); + } + + public static class AddingIterator extends WrappingIterator { + long amount = 0; + + @Override + public Value getTopValue() { + Value val = super.getTopValue(); + long l = Long.parseLong(val.toString()); + String newVal = (l + amount) + ""; + return new Value(newVal.getBytes(Charsets.UTF_8)); + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + this.setSource(source); + amount = Long.parseLong(options.get("amount")); + } + } + + public static class MultiplyingIterator extends WrappingIterator { + long amount = 0; + + @Override + public Value getTopValue() { + Value val = super.getTopValue(); + long l = Long.parseLong(val.toString()); + String newVal = l * amount + ""; + return new Value(newVal.getBytes(Charsets.UTF_8)); + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + this.setSource(source); + amount = Long.parseLong(options.get("amount")); + } + } + + @Test + public void testTableAndConditionIterators() throws Exception { + + // test w/ table that has iterators configured + Connector conn = getConnector(); + String tableName = getUniqueNames(1)[0]; + + IteratorSetting aiConfig1 = new IteratorSetting(30, "AI1", AddingIterator.class); + aiConfig1.addOption("amount", "2"); + IteratorSetting aiConfig2 = new IteratorSetting(35, "MI1", MultiplyingIterator.class); + aiConfig2.addOption("amount", "3"); + IteratorSetting aiConfig3 = new IteratorSetting(40, "AI2", AddingIterator.class); + aiConfig3.addOption("amount", "5"); + + conn.tableOperations().create(tableName); + + BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation m = new Mutation("ACCUMULO-1000"); + m.put("count", "comments", "6"); + bw.addMutation(m); + + m = new Mutation("ACCUMULO-1001"); + m.put("count", "comments", "7"); + bw.addMutation(m); + + m = new Mutation("ACCUMULO-1002"); + m.put("count", "comments", "8"); + bw.addMutation(m); + + bw.close(); + + conn.tableOperations().attachIterator(tableName, aiConfig1, EnumSet.of(IteratorScope.scan)); + conn.tableOperations().offline(tableName, true); + conn.tableOperations().online(tableName, true); + + ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); + + ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("8")); + cm6.put("count", "comments", "7"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); + + Scanner scanner = conn.createScanner(tableName, new Authorizations()); + scanner.setRange(new Range("ACCUMULO-1000")); + scanner.fetchColumn(new Text("count"), new Text("comments")); + + Entry<Key,Value> entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("9", entry.getValue().toString()); + + ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("27")); + cm7.put("count", "comments", "8"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus()); + + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("10", entry.getValue().toString()); + + ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2, aiConfig3).setValue("35")); + cm8.put("count", "comments", "9"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus()); + + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("11", entry.getValue().toString()); + + ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("33")); + cm3.put("count", "comments", "3"); + + ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count", "comments").setIterators(aiConfig3).setValue("14")); + cm4.put("count", "comments", "3"); + + ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count", "comments").setIterators(aiConfig3).setValue("10")); + cm5.put("count", "comments", "3"); + + Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator()); + Map<String,Status> actual = new HashMap<String,Status>(); + + while (results.hasNext()) { + Result result = results.next(); + String k = new String(result.getMutation().getRow()); + Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k)); + actual.put(k, result.getStatus()); + } + + cw.close(); + + Map<String,Status> expected = new HashMap<String,Status>(); + expected.put("ACCUMULO-1000", Status.ACCEPTED); + expected.put("ACCUMULO-1001", Status.ACCEPTED); + expected.put("ACCUMULO-1002", Status.REJECTED); + + Assert.assertEquals(expected, actual); cw.close(); }