Thanks Josh. I will make that change to be safe, though in these experiments I use a maxversions of 1 anyway.
I look forward to seeing the definitive Accumulo + Spark guide some day, glad to help where I can if there are specific things to fill in. On Mon, May 4, 2015 at 2:40 PM, Josh Elser <[email protected]> wrote: > Thanks _so_ much for taking the time to write this up, Marc! It's a good > example. > > One note, you probably want to use an priority greater than 20 for the > IteratorSetting. The VersioningIterator is set on Accumulo tables by > default at priority 20. In most cases, you'd want to see the state of the > table _after_ the VersioningIterator filters things. > > Marc Reichman wrote: > >> This is working very well, thanks Russ! >> >> For anyone ever stuck in this predicament, using the WholeRowIterator, I >> was able to get the same Iterator<Map.Entry<Key,Value>> that I can get >> similarly to the AccumuloRowInputFormat as follows: >> >> ... >> >> IteratorSetting iteratorSetting =newIteratorSetting(1, >> WholeRowIterator.class); >> AccumuloInputFormat.addIterator(job, iteratorSetting); >> >> // setup RDD >> JavaPairRDD<Key, Value> pairRDD = >> sparkContext.newAPIHadoopRDD(job.getConfiguration(), >> AccumuloInputFormat.class, >> Key.class, Value.class); >> >> JavaRDD<List<MyResult>> result = pairRDD >> .map(newFunction<Tuple2<Key, Value>, List<MyResult>>() { >> @Override >> publicList<MyResult> call(Tuple2<Key, Value> >> keyValueTuple2)throwsException { >> SortedMap<Key, Value> wholeRow = >> WholeRowIterator.decodeRow(keyValueTuple2._1, keyValueTuple2._2); >> MyObject o = getMyObject(wholeRow.entrySet().iterator()); >> *...* >> } >> }); >> >> Previously, I was doing this approach, which required an additional stage >> of Spark calculations as well as a shuffle phase, and wasn't nearly as >> quick, and also needed a helper class (AccumuloRowMapEntry, very basic >> Map.Entry implementation): >> >> JavaRDD<List<MyResult>> result = pairRDD >> .mapToPair(newPairFunction<Tuple2<Key, Value>, Text, >> Map.Entry<Key, Value>>() { >> @Override >> publicTuple2<Text, Map.Entry<Key, Value>> call(Tuple2<Key, >> Value> keyValueTuple2)throwsException { >> return newTuple2<Text, Map.Entry<Key, >> Value>>(keyValueTuple2._1.getRow(),newAccumuloRowMapEntry(keyValueTuple2._1, >> keyValueTuple2._2)); >> } >> }) >> .groupByKey() >> .map(newFunction<Tuple2<Text, Iterable<Map.Entry<Key, Value>>>, >> List<MyResult>>() { >> @Override >> publicList<MyResult> call(Tuple2<Text, >> Iterable<Map.Entry<Key, Value>>> textIterableTuple2)throwsException { >> MyObject o = >> getMyObject(textIterableTuple2._2.iterator()); >> *...* >> } >> }); >> >> >> Thanks again for all the help. >> >> Marc >> >> >> On Mon, May 4, 2015 at 12:23 PM, Russ Weeks <[email protected] >> <mailto:[email protected]>> wrote: >> >> Yeah, exactly. When you put the WholeRowIterator on the scan, >> instead of seeing all the Key,Value pairs that make up a row you'll >> see a single Key,Value pair. The only part of the Key that matters >> is the row id. The Value is an encoded map of the Key,Value pairs >> that constitute the row. Call the static method >> WholeRowIterator.decodeRow to get at this map. >> >> The decoded Keys have all the CF, CQ, timestamp and visibility data >> populated. I'm not sure if they have the row ID populated; either >> way, they all belong to the same row that was present in the >> original Key. >> >> -Russ >> >> >> On Mon, May 4, 2015 at 9:51 AM, Marc Reichman >> <[email protected] <mailto:[email protected]>> >> wrote: >> >> Hi Russ, >> >> How exactly would this work regarding column qualifiers, etc, as >> those are part of the key? I apologize but I'm not as familiar >> with the WholeRowIterator use model, does it consolidate based >> on the rowkey, and then return some Key+Value "value" which has >> all the original information serialized? >> >> My rows aren't gigantic but they can occasionally get into the >> 10s of MB. >> >> On Mon, May 4, 2015 at 11:22 AM, Russ Weeks >> <[email protected] <mailto:[email protected]>> >> wrote: >> >> Hi, Marc, >> >> If your rows are small you can use the WholeRowIterator to >> get all the values with the key in one consuming function. >> If your rows are big but you know up-front that you'll only >> need a small part of each row, you could put a filter in >> front of the WholeRowIterator. >> >> I expect there's a performance hit (I haven't done any >> benchmarks myself) because of the extra >> serialization/deserialization but it's a very convenient way >> of working with Rows in Spark. >> >> Regards, >> -Russ >> >> On Mon, May 4, 2015 at 8:46 AM, Marc Reichman >> <[email protected] >> <mailto:[email protected]>> wrote: >> >> Has anyone done any testing with Spark and >> AccumuloRowInputFormat? I have no problem doing this for >> AccumuloInputFormat: >> >> JavaPairRDD<Key, Value> pairRDD = >> sparkContext.newAPIHadoopRDD(job.getConfiguration(), >> AccumuloInputFormat.class, >> Key.class, Value.class); >> >> But I run into a snag trying to do a similar thing: >> >> JavaPairRDD<Text, PeekingIterator<Map.Entry<Key, >> Value>>> pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(), >> AccumuloRowInputFormat.class, >> Text.class, PeekingIterator.class); >> >> The compilation error is (big, sorry): >> >> Error:(141, 97) java: method newAPIHadoopRDD in class >> org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; >> required: >> org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> >> found: >> org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat>,java.lang.Class<org.apache.hadoop.io.Text>,java.lang.Class<org.apache.accumulo.core.util.PeekingIterator> >> reason: inferred type does not conform to declared >> bound(s) >> inferred: >> org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat >> bound(s): >> org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.Text,org.apache.accumulo.core.util.PeekingIterator> >> >> I've tried a few things, the signature of the function is: >> >> public<K, V, >> Fextendsorg.apache.hadoop.mapreduce.InputFormat<K, V>> JavaPairRDD<K, V> >> newAPIHadoopRDD(Configuration conf, Class<F> fClass, Class<K> kClass, >> Class<V> vClass) >> >> I guess it's having trouble with the format extending >> InputFormatBase with its own additional generic parameters (the Map.Entry >> inside PeekingIterator). >> >> This may be an issue to chase with Spark vs Accumulo, >> unless something can be tweaked on the Accumulo side or I could wrap the >> InputFormat with my own somehow. >> >> Accumulo 1.6.1, Spark 1.3.1, JDK 7u71. >> >> Stopping short of this, can anyone think of a good way to >> use AccumuloInputFormat to get what I'm getting from the Row version in a >> performant way? It doesn't necessarily have to be an iterator approach, but >> I'd need all my values with the key in one consuming function. I'm looking >> into ways to do it in spark functions but trying to avoid any major >> performance hits. >> >> Thanks, >> >> Marc >> >> p.s. The summit was absolutely great, thank you all for >> having it! >> >> >> >> >> >>
