Tal - that would be great to have open sourced if you can do it!

On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
<[email protected]> wrote:
> Hi Tal,
>
> Thanks for the info will try it out and see how it goes.
>
>
> On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <[email protected]> wrote:
>>
>> Hi Pulasthi,
>>
>> I couldn't make it work, so what I ended up doing was implement 3 Java
>> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , another
>> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that extends
>> org.apache.hadoop.mapreduce.RecordReader and used them to load data from
>> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works great!
>> I'm cleaning up the code a bit and will upload to github as an open source
>> (after the summit).
>>
> That's great looking forward check it out after you publish on github :).
>
>
> Thanks,
> Pulasthi
>>
>> I hope this helps for now,
>>
>> Tal
>>
>>
>> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>> <[email protected]> wrote:
>>>
>>> Hi Tal,
>>>
>>> I also tried doing this by converting the scala sample into Java but i am
>>> getting an compile time error below is the code
>>>
>>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>>
>>>         //Build the job configuration with ConfigHelper provided by
>>> Cassandra
>>>         Job job = null;
>>>         try {
>>>             job = new Job();
>>>         } catch (IOException e) {
>>>             e.printStackTrace();  //To change body of catch statement use
>>> File | Settings | File Templates.
>>>         }
>>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>
>>>         String host = args[1];
>>>         String port = args[2];
>>>
>>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>>> host);
>>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>>> host);
>>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>>> "casDemo", "Words");
>>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>>> "casDemo", "WordCount");
>>>
>>>         SlicePredicate predicate = new SlicePredicate();
>>>         SliceRange sliceRange = new SliceRange();
>>>         sliceRange.setStart(new byte[0]);
>>>         sliceRange.setFinish(new byte[0]);
>>>         predicate.setSlice_range(sliceRange);
>>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> predicate);
>>>
>>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>
>>>         // Make a new Hadoop RDD
>>>         final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
>>> new TreeMap<ByteBuffer, IColumn>();
>>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
>>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>>
>>>
>>> i also tried the code segment that you have provided but i keep getting
>>> the following error.
>>>
>>> java:
>>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>>> of ? extends java.util.SortedMap>)
>>>
>>> Did you encounter this if so any help on this would be appreciated.
>>>
>>> Best Regards,
>>> Pulasthi
>>>
>>>
>>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>>>> I've translated the the scala example - CassandraTest.scala - to Java, but 
>>>> I
>>>> keep getting the following exception:
>>>>
>>>> Exception in thread "main" java.io.IOException: Could not get input
>>>> splits
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>>> at
>>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>>> at
>>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>>> ... 9 more
>>>> Caused by: java.lang.RuntimeException:
>>>> org.apache.thrift.transport.TTransportException
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>> at java.lang.Thread.run(Thread.java:695)
>>>> Caused by: org.apache.thrift.transport.TTransportException
>>>> at
>>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>> at
>>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>>> at
>>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>>> at
>>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>>> at
>>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>>> ... 8 more
>>>>
>>>>
>>>> This is the relevant code portion:
>>>>
>>>>             Job job = new Job();
>>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>> String host = "<server>";
>>>> String port = "9160";
>>>>
>>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>>> "UserEvent", true);
>>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>>> "Murmur3Partitioner");
>>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>>> "Murmur3Partitioner");
>>>>
>>>>    SlicePredicate predicate = new SlicePredicate();
>>>>     SliceRange sliceRange = new SliceRange();
>>>>    sliceRange.setStart(new byte[0]);
>>>>    sliceRange.setFinish(new byte[0]);
>>>>    predicate.setSlice_range(sliceRange);
>>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>>> predicate);
>>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>>> IColumn>();
>>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>>>> ByteBuffer.class, b.getClass());
>>>>
>>>>
>>>> I would appreciate any input you may have.
>>>>
>>>> Thanks!
>>>>
>>>> Tal
>>>
>>>
>>>
>>>
>>> --
>>> Pulasthi Supun
>>> Undergraduate
>>> Dpt of Computer Science & Engineering
>>> University of Moratuwa
>>> Blog : http://pulasthisupun.blogspot.com/
>>> Git hub profile: https://github.com/pulasthi
>>
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: https://github.com/pulasthi

Reply via email to