Thanks _so_ much for taking the time to write this up, Marc! It's a good example.

One note, you probably want to use an priority greater than 20 for the IteratorSetting. The VersioningIterator is set on Accumulo tables by default at priority 20. In most cases, you'd want to see the state of the table _after_ the VersioningIterator filters things.

Marc Reichman wrote:
This is working very well, thanks Russ!

For anyone ever stuck in this predicament, using the WholeRowIterator, I
was able to get the same Iterator<Map.Entry<Key,Value>> that I can get
similarly to the AccumuloRowInputFormat as follows:

...

IteratorSetting iteratorSetting =newIteratorSetting(1, WholeRowIterator.class);
AccumuloInputFormat.addIterator(job, iteratorSetting);

// setup RDD
JavaPairRDD<Key, Value>  pairRDD = 
sparkContext.newAPIHadoopRDD(job.getConfiguration(),
         AccumuloInputFormat.class,
         Key.class, Value.class);

JavaRDD<List<MyResult>>  result = pairRDD
         .map(newFunction<Tuple2<Key, Value>, List<MyResult>>() {
             @Override
             publicList<MyResult>  call(Tuple2<Key, Value>  
keyValueTuple2)throwsException {
                 SortedMap<Key, Value>  wholeRow = 
WholeRowIterator.decodeRow(keyValueTuple2._1, keyValueTuple2._2);
                 MyObject o = getMyObject(wholeRow.entrySet().iterator());
                 *...*
             }
         });

Previously, I was doing this approach, which required an additional stage of 
Spark calculations as well as a shuffle phase, and wasn't nearly as quick, and 
also needed a helper class (AccumuloRowMapEntry, very basic Map.Entry 
implementation):

JavaRDD<List<MyResult>>  result = pairRDD
         .mapToPair(newPairFunction<Tuple2<Key, Value>, Text, Map.Entry<Key, 
Value>>() {
             @Override
             publicTuple2<Text, Map.Entry<Key, Value>>  call(Tuple2<Key, Value> 
 keyValueTuple2)throwsException {
                 return newTuple2<Text, Map.Entry<Key, 
Value>>(keyValueTuple2._1.getRow(),newAccumuloRowMapEntry(keyValueTuple2._1, 
keyValueTuple2._2));
             }
         })
         .groupByKey()
         .map(newFunction<Tuple2<Text, Iterable<Map.Entry<Key, Value>>>, 
List<MyResult>>() {
             @Override
             publicList<MyResult>  call(Tuple2<Text, Iterable<Map.Entry<Key, 
Value>>>  textIterableTuple2)throwsException {
                 MyObject o = getMyObject(textIterableTuple2._2.iterator());
                 *...*
             }
         });


Thanks again for all the help.

Marc


On Mon, May 4, 2015 at 12:23 PM, Russ Weeks <[email protected]
<mailto:[email protected]>> wrote:

    Yeah, exactly. When you put the WholeRowIterator on the scan,
    instead of seeing all the Key,Value pairs that make up a row you'll
    see a single Key,Value pair. The only part of the Key that matters
    is the row id. The Value is an encoded map of the Key,Value pairs
    that constitute the row. Call the static method
    WholeRowIterator.decodeRow to get at this map.

    The decoded Keys have all the CF, CQ, timestamp and visibility data
    populated. I'm not sure if they have the row ID populated; either
    way, they all belong to the same row that was present in the
    original Key.

    -Russ


    On Mon, May 4, 2015 at 9:51 AM, Marc Reichman
    <[email protected] <mailto:[email protected]>>
    wrote:

        Hi Russ,

        How exactly would this work regarding column qualifiers, etc, as
        those are part of the key? I apologize but I'm not as familiar
        with the WholeRowIterator use model, does it consolidate based
        on the rowkey, and then return some Key+Value "value" which has
        all the original information serialized?

        My rows aren't gigantic but they can occasionally get into the
        10s of MB.

        On Mon, May 4, 2015 at 11:22 AM, Russ Weeks
        <[email protected] <mailto:[email protected]>> wrote:

            Hi, Marc,

            If your rows are small you can use the WholeRowIterator to
            get all the values with the key in one consuming function.
            If your rows are big but you know up-front that you'll only
            need a small part of each row, you could put a filter in
            front of the WholeRowIterator.

            I expect there's a performance hit (I haven't done any
            benchmarks myself) because of the extra
            serialization/deserialization but it's a very convenient way
            of working with Rows in Spark.

            Regards,
            -Russ

            On Mon, May 4, 2015 at 8:46 AM, Marc Reichman
            <[email protected]
            <mailto:[email protected]>> wrote:

                Has anyone done any testing with Spark and
                AccumuloRowInputFormat? I have no problem doing this for
                AccumuloInputFormat:

                JavaPairRDD<Key, Value>  pairRDD = 
sparkContext.newAPIHadoopRDD(job.getConfiguration(),
                         AccumuloInputFormat.class,
                         Key.class, Value.class);

                But I run into a snag trying to do a similar thing:

                JavaPairRDD<Text, PeekingIterator<Map.Entry<Key, Value>>>  
pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(),
                         AccumuloRowInputFormat.class,
                         Text.class, PeekingIterator.class);

                The compilation error is (big, sorry):

                Error:(141, 97) java: method newAPIHadoopRDD in class 
org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
                   required: 
org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
                   found: 
org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat>,java.lang.Class<org.apache.hadoop.io.Text>,java.lang.Class<org.apache.accumulo.core.util.PeekingIterator>
                   reason: inferred type does not conform to declared bound(s)
                     inferred: 
org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat
                     bound(s): 
org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.Text,org.apache.accumulo.core.util.PeekingIterator>

                I've tried a few things, the signature of the function is:

                public<K, V, Fextendsorg.apache.hadoop.mapreduce.InputFormat<K, V>>  JavaPairRDD<K, 
V>  newAPIHadoopRDD(Configuration conf, Class<F>  fClass, Class<K>  kClass, Class<V>  vClass)

                I guess it's having trouble with the format extending 
InputFormatBase with its own additional generic parameters (the Map.Entry 
inside PeekingIterator).

                This may be an issue to chase with Spark vs Accumulo, unless 
something can be tweaked on the Accumulo side or I could wrap the InputFormat 
with my own somehow.

                Accumulo 1.6.1, Spark 1.3.1, JDK 7u71.

                Stopping short of this, can anyone think of a good way to use 
AccumuloInputFormat to get what I'm getting from the Row version in a 
performant way? It doesn't necessarily have to be an iterator approach, but I'd 
need all my values with the key in one consuming function. I'm looking into 
ways to do it in spark functions but trying to avoid any major performance hits.

                Thanks,

                Marc

                p.s. The summit was absolutely great, thank you all for having 
it!





Reply via email to