Author: kturner Date: Tue May 7 14:38:27 2013 New Revision: 1479924 URL: http://svn.apache.org/r1479924 Log: ACCUMULO-1337 patch from Pushpinder Heer to add WholeColumnFamilyIterator
Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java?rev=1479924&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java Tue May 7 14:38:27 2013 @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.hadoop.io.Text; + +/** + * + * The WholeColumnFamilyIterator is designed to provide row/cf-isolation so that queries see mutations as atomic. It does so by grouping row/Column family (as + * key) and rest of data as Value into a single key/value pair, which is returned through the client as an atomic operation. + * + * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned. + * + * @since 1.6.0 + */ +public class WholeColumnFamilyIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber { + + private SortedKeyValueIterator<Key,Value> sourceIter; + private Key topKey = null; + private Value topValue = null; + + public WholeColumnFamilyIterator() { + + } + + WholeColumnFamilyIterator(SortedKeyValueIterator<Key,Value> source) { + this.sourceIter = source; + } + + /** + * Decode whole row/cf out of value. decode key value pairs that have been encoded into a single // value + * + * @param rowKey + * the row key to decode + * @param rowValue + * the value to decode + * @return the sorted map. After decoding the flattened data map + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public static final SortedMap<Key,Value> decodeColumnFamily(Key rowKey, Value rowValue) throws IOException { + SortedMap<Key,Value> map = new TreeMap<Key,Value>(); + ByteArrayInputStream in = new ByteArrayInputStream(rowValue.get()); + DataInputStream din = new DataInputStream(in); + int numKeys = din.readInt(); + for (int i = 0; i < numKeys; i++) { + byte[] cq; + byte[] cv; + byte[] valBytes; + // read the col qual + { + int len = din.readInt(); + cq = new byte[len]; + din.read(cq); + } + // read the col visibility + { + int len = din.readInt(); + cv = new byte[len]; + din.read(cv); + } + // read the timestamp + long timestamp = din.readLong(); + // read the value + { + int len = din.readInt(); + valBytes = new byte[len]; + din.read(valBytes); + } + map.put(new Key(rowKey.getRowData().toArray(), rowKey.getColumnFamilyData().toArray(), cq, cv, timestamp, false, false), new Value(valBytes, false)); + } + return map; + } + + /** + * Encode row/cf. Take a stream of keys and values and output a value that encodes everything but their row and column families keys and values must be paired + * one for one + * + * @param keys + * the row keys to encode into value + * @param values + * the value to encode + * @return the value. After encoding keys/values + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public static final Value encodeColumnFamily(List<Key> keys, List<Value> values) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(keys.size()); + for (int i = 0; i < keys.size(); i++) { + Key k = keys.get(i); + Value v = values.get(i); + // write the colqual + { + ByteSequence bs = k.getColumnQualifierData(); + dout.writeInt(bs.length()); + dout.write(bs.getBackingArray(), bs.offset(), bs.length()); + } + // write the column visibility + { + ByteSequence bs = k.getColumnVisibilityData(); + dout.writeInt(bs.length()); + dout.write(bs.getBackingArray(), bs.offset(), bs.length()); + } + // write the timestamp + dout.writeLong(k.getTimestamp()); + // write the value + byte[] valBytes = v.get(); + dout.writeInt(valBytes.length); + dout.write(valBytes); + } + + return new Value(out.toByteArray()); + } + + List<Key> keys = new ArrayList<Key>(); + List<Value> values = new ArrayList<Value>(); + + private void prepKeys() throws IOException { + if (topKey != null) + return; + Text currentRow; + Text currentCf; + + do { + if (sourceIter.hasTop() == false) + return; + currentRow = new Text(sourceIter.getTopKey().getRow()); + currentCf = new Text(sourceIter.getTopKey().getColumnFamily()); + + keys.clear(); + values.clear(); + while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow) && sourceIter.getTopKey().getColumnFamily().equals(currentCf)) { + keys.add(new Key(sourceIter.getTopKey())); + values.add(new Value(sourceIter.getTopValue())); + sourceIter.next(); + } + } while (!filter(currentRow, keys, values)); + + topKey = new Key(currentRow, currentCf); + topValue = encodeColumnFamily(keys, values); + + } + + /** + * + * @param currentRow + * All keys & cf have this in their row portion (do not modify!). + * @param keys + * One key for each key & cf group in the row, ordered as they are given by the source iterator (do not modify!). + * @param values + * One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!). + * @return true if we want to keep the row, false if we want to skip it + */ + protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { + return true; + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + if (sourceIter != null) + return new WholeColumnFamilyIterator(sourceIter.deepCopy(env)); + return new WholeColumnFamilyIterator(); + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + @Override + public boolean hasTop() { + return topKey != null || sourceIter.hasTop(); + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + sourceIter = source; + } + + @Override + public void next() throws IOException { + topKey = null; + topValue = null; + prepKeys(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + topKey = null; + topValue = null; + + Key sk = range.getStartKey(); + + if (sk != null && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0 && sk.getTimestamp() == Long.MAX_VALUE + && !range.isStartKeyInclusive()) { + // assuming that we are seeking using a key previously returned by + // this iterator + // therefore go to the next row/cf + Key followingRowKey = sk.followingKey(PartialKey.ROW_COLFAM); + if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0) + return; + + range = new Range(sk.followingKey(PartialKey.ROW_COLFAM), true, range.getEndKey(), range.isEndKeyInclusive()); + } + + sourceIter.seek(range, columnFamilies, inclusive); + prepKeys(); + } + + @Override + public IteratorOptions describeOptions() { + return new IteratorOptions("wholecolumnfamilyiterator", "WholeColumnFamilyIterator. Group equal row & column family into single row entry.", null, null); + } + + @Override + public boolean validateOptions(Map<String,String> options) { + return true; + } + +} Added: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java?rev=1479924&view=auto ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java (added) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java Tue May 7 14:38:27 2013 @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.hadoop.io.Text; +import org.junit.Assert; + +public class WholeColumnFamilyIteratorTest extends TestCase { + + public void testEmptyStuff() throws IOException { + SortedMap<Key,Value> map = new TreeMap<Key,Value>(); + SortedMap<Key,Value> map2 = new TreeMap<Key,Value>(); + final Map<Text,Boolean> toInclude = new HashMap<Text,Boolean>(); + map.put(new Key(new Text("r1"), new Text("cf1"), new Text("cq1"), new Text("cv1"), 1l), new Value("val1".getBytes())); + map.put(new Key(new Text("r1"), new Text("cf1"), new Text("cq2"), new Text("cv1"), 2l), new Value("val2".getBytes())); + map.put(new Key(new Text("r2"), new Text("cf1"), new Text("cq1"), new Text("cv1"), 3l), new Value("val3".getBytes())); + map.put(new Key(new Text("r2"), new Text("cf2"), new Text("cq1"), new Text("cv1"), 4l), new Value("val4".getBytes())); + map.put(new Key(new Text("r3"), new Text("cf1"), new Text("cq1"), new Text("cv1"), 5l), new Value("val4".getBytes())); + map.put(new Key(new Text("r3"), new Text("cf1"), new Text("cq1"), new Text("cv2"), 6l), new Value("val4".getBytes())); + map.put(new Key(new Text("r4"), new Text("cf1"), new Text("cq1"), new Text("cv1"), 7l), new Value("".getBytes())); + map.put(new Key(new Text("r4"), new Text("cf1"), new Text("cq1"), new Text(""), 8l), new Value("val1".getBytes())); + map.put(new Key(new Text("r4"), new Text("cf1"), new Text(""), new Text("cv1"), 9l), new Value("val1".getBytes())); + map.put(new Key(new Text("r4"), new Text(""), new Text("cq1"), new Text("cv1"), 10l), new Value("val1".getBytes())); + map.put(new Key(new Text(""), new Text("cf1"), new Text("cq1"), new Text("cv1"), 11l), new Value("val1".getBytes())); + boolean b = true; + int trueCount = 0; + for (Key k : map.keySet()) { + if (toInclude.containsKey(k.getRow())) { + if (toInclude.get(k.getRow())) { + map2.put(k, map.get(k)); + } + continue; + } + b = !b; + toInclude.put(k.getRow(), b); + if (b) { + trueCount++; + map2.put(k, map.get(k)); + } + } + SortedMapIterator source = new SortedMapIterator(map); + WholeColumnFamilyIterator iter = new WholeColumnFamilyIterator(source); + SortedMap<Key,Value> resultMap = new TreeMap<Key,Value>(); + iter.seek(new Range(), new ArrayList<ByteSequence>(), false); + int numRows = 0; + while (iter.hasTop()) { + numRows++; + Key rowKey = iter.getTopKey(); + Value rowValue = iter.getTopValue(); + resultMap.putAll(WholeColumnFamilyIterator.decodeColumnFamily(rowKey, rowValue)); + iter.next(); + } + + // we have 7 groups of row key/cf + Assert.assertEquals(7, numRows); + + assertEquals(resultMap, map); + + WholeColumnFamilyIterator iter2 = new WholeColumnFamilyIterator(source) { + @Override + public boolean filter(Text row, List<Key> keys, List<Value> values) { + return toInclude.get(row); + } + }; + resultMap.clear(); + iter2.seek(new Range(), new ArrayList<ByteSequence>(), false); + numRows = 0; + while (iter2.hasTop()) { + numRows++; + Key rowKey = iter2.getTopKey(); + Value rowValue = iter2.getTopValue(); + resultMap.putAll(WholeColumnFamilyIterator.decodeColumnFamily(rowKey, rowValue)); + iter2.next(); + } + assertTrue(numRows == trueCount); + assertEquals(resultMap, map2); + } + + private void pkv(SortedMap<Key,Value> map, String row, String cf, String cq, String cv, long ts, String val) { + map.put(new Key(new Text(row), new Text(cf), new Text(cq), new Text(cv), ts), new Value(val.getBytes())); + } + + public void testContinue() throws Exception { + SortedMap<Key,Value> map1 = new TreeMap<Key,Value>(); + pkv(map1, "row1", "cf1", "cq1", "cv1", 5, "foo"); + pkv(map1, "row1", "cf1", "cq2", "cv1", 6, "bar"); + + SortedMap<Key,Value> map2 = new TreeMap<Key,Value>(); + pkv(map2, "row2", "cf1", "cq1", "cv1", 5, "foo"); + pkv(map2, "row2", "cf1", "cq2", "cv1", 6, "bar"); + + SortedMap<Key,Value> map3 = new TreeMap<Key,Value>(); + pkv(map3, "row3", "cf1", "cq1", "cv1", 5, "foo"); + pkv(map3, "row3", "cf1", "cq2", "cv1", 6, "bar"); + + SortedMap<Key,Value> map = new TreeMap<Key,Value>(); + map.putAll(map1); + map.putAll(map2); + map.putAll(map3); + + SortedMapIterator source = new SortedMapIterator(map); + WholeColumnFamilyIterator iter = new WholeColumnFamilyIterator(source); + + Range range = new Range(new Text("row1"), true, new Text("row2"), true); + iter.seek(range, new ArrayList<ByteSequence>(), false); + + assertTrue(iter.hasTop()); + assertEquals(map1, WholeColumnFamilyIterator.decodeColumnFamily(iter.getTopKey(), iter.getTopValue())); + + // simulate something continuing using the last key from the iterator + // this is what client and server code will do + range = new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive()); + iter.seek(range, new ArrayList<ByteSequence>(), false); + + assertTrue(iter.hasTop()); + assertEquals(map2, WholeColumnFamilyIterator.decodeColumnFamily(iter.getTopKey(), iter.getTopValue())); + + iter.next(); + + assertFalse(iter.hasTop()); + + } + + public void testBug1() throws Exception { + SortedMap<Key,Value> map1 = new TreeMap<Key,Value>(); + pkv(map1, "row1", "cf1", "cq1", "cv1", 5, "foo"); + pkv(map1, "row1", "cf1", "cq2", "cv1", 6, "bar"); + + SortedMap<Key,Value> map2 = new TreeMap<Key,Value>(); + pkv(map2, "row2", "cf1", "cq1", "cv1", 5, "foo"); + + SortedMap<Key,Value> map = new TreeMap<Key,Value>(); + map.putAll(map1); + map.putAll(map2); + + MultiIterator source = new MultiIterator(Collections.singletonList((SortedKeyValueIterator<Key,Value>) new SortedMapIterator(map)), new Range(null, true, + new Text("row1"), true)); + WholeColumnFamilyIterator iter = new WholeColumnFamilyIterator(source); + + Range range = new Range(new Text("row1"), true, new Text("row2"), true); + iter.seek(range, new ArrayList<ByteSequence>(), false); + + assertTrue(iter.hasTop()); + assertEquals(map1, WholeColumnFamilyIterator.decodeColumnFamily(iter.getTopKey(), iter.getTopValue())); + + // simulate something continuing using the last key from the iterator + // this is what client and server code will do + range = new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive()); + iter.seek(range, new ArrayList<ByteSequence>(), false); + + assertFalse(iter.hasTop()); + + } + +}