Hi Tal,

Thanks for the info will try it out and see how it goes.


On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <[email protected]> wrote:

> Hi Pulasthi,
>
> I couldn't make it work, so what I ended up doing was implement 3 Java
> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
> another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
> extends org.apache.hadoop.mapreduce.RecordReader and used them to load data
> from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
> great! I'm cleaning up the code a bit and will upload to github as an open
> source (after the summit).
>
> That's great looking forward check it out after you publish on github :).


Thanks,
Pulasthi

> I hope this helps for now,
>
> Tal
>
>
> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe <
> [email protected]> wrote:
>
>> Hi Tal,
>>
>> I also tried doing this by converting the scala sample into Java but i am
>> getting an compile time error below is the code
>>
>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>
>>         //Build the job configuration with ConfigHelper provided by
>> Cassandra
>>         Job job = null;
>>         try {
>>             job = new Job();
>>         } catch (IOException e) {
>>             e.printStackTrace();  //To change body of catch statement use
>> File | Settings | File Templates.
>>         }
>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>
>>         String host = args[1];
>>         String port = args[2];
>>
>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> host);
>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>> "casDemo", "Words");
>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>> "casDemo", "WordCount");
>>
>>         SlicePredicate predicate = new SlicePredicate();
>>         SliceRange sliceRange = new SliceRange();
>>         sliceRange.setStart(new byte[0]);
>>         sliceRange.setFinish(new byte[0]);
>>         predicate.setSlice_range(sliceRange);
>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> predicate);
>>
>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>
>>         // Make a new Hadoop RDD
>>         final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
>> new TreeMap<ByteBuffer, IColumn>();
>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>
>>
>> i also tried the code segment that you have provided but i keep getting
>> the following error.
>>
>> java:
>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>> of ? extends java.util.SortedMap>)
>>
>> Did you encounter this if so any help on this would be appreciated.
>>
>> Best Regards,
>> Pulasthi
>>
>>
>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> wrote:
>>
>>> Hi,
>>>
>>>
>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>>> I've translated the the scala example - CassandraTest.scala - to Java, but
>>> I keep getting the following exception:
>>>
>>> Exception in thread "main" java.io.IOException: Could not get input
>>> splits
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>>  at
>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>>  at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>>  at
>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>>  at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>>  at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>>  at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>> ... 9 more
>>> Caused by: java.lang.RuntimeException:
>>> org.apache.thrift.transport.TTransportException
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>>  at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>>  at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> at java.lang.Thread.run(Thread.java:695)
>>> Caused by: org.apache.thrift.transport.TTransportException
>>> at
>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>  at
>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>> at
>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>>  at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>>  at
>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>>  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>> at
>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>>  at
>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>>  ... 8 more
>>>
>>>
>>> This is the relevant code portion:
>>>
>>>             Job job = new Job();
>>>  job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>  String host = "<server>";
>>> String port = "9160";
>>>
>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>>  ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>>  ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>> "UserEvent", true);
>>>  ConfigHelper.setInputPartitioner(job.getConfiguration(), "
>>> Murmur3Partitioner");
>>>     ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>
>>>     SlicePredicate predicate = new SlicePredicate();
>>>     SliceRange sliceRange = new SliceRange();
>>>     sliceRange.setStart(new byte[0]);
>>>     sliceRange.setFinish(new byte[0]);
>>>     predicate.setSlice_range(sliceRange);
>>>     ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> predicate);
>>>   final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>> IColumn>();
>>>  JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>>> ByteBuffer.class, b.getClass());
>>>
>>>
>>> I would appreciate any input you may have.
>>>
>>> Thanks!
>>>
>>> Tal
>>>
>>
>>
>>
>> --
>> Pulasthi Supun
>> Undergraduate
>> Dpt of Computer Science & Engineering
>> University of Moratuwa
>> Blog : http://pulasthisupun.blogspot.com/
>> Git hub profile: 
>> <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
>> <https://github.com/pulasthi>
>>
>
>


-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Reply via email to