Hi Lucas, That did the trick just had to change JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> to JavaPairRDD<ByteBuffer,* ? extends * SortedMap<ByteBuffer, IColumn>> thanks for the help.
Regards, Pulasthi On Thu, Dec 5, 2013 at 10:40 AM, Lucas Fernandes Brunialti < lbrunia...@igcorp.com.br> wrote: > Hi all, > > This should work: > > JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd = > context.newAPIHadoopRDD(job.getConfiguration(), > > ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), > ByteBuffer.class, SortedMap.class); > > I have translated the word count written in scala to java, i just can't > send it right now... > > Best Regards. > > Lucas. > On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" < > pulasthi...@gmail.com> wrote: > >> Hi Tal, >> >> Just checking if you have added your code to github :). if you have could >> you point me to it. >> >> Best Regards, >> Pulasthi >> >> >> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pwend...@gmail.com>wrote: >> >>> Tal - that would be great to have open sourced if you can do it! >>> >>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe >>> <pulasthi...@gmail.com> wrote: >>> > Hi Tal, >>> > >>> > Thanks for the info will try it out and see how it goes. >>> > >>> > >>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com> >>> wrote: >>> >> >>> >> Hi Pulasthi, >>> >> >>> >> I couldn't make it work, so what I ended up doing was implement 3 Java >>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , >>> another >>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that >>> extends >>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data >>> from >>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works >>> great! >>> >> I'm cleaning up the code a bit and will upload to github as an open >>> source >>> >> (after the summit). >>> >> >>> > That's great looking forward check it out after you publish on github >>> :). >>> > >>> > >>> > Thanks, >>> > Pulasthi >>> >> >>> >> I hope this helps for now, >>> >> >>> >> Tal >>> >> >>> >> >>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe >>> >> <pulasthi...@gmail.com> wrote: >>> >>> >>> >>> Hi Tal, >>> >>> >>> >>> I also tried doing this by converting the scala sample into Java but >>> i am >>> >>> getting an compile time error below is the code >>> >>> >>> >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo"); >>> >>> >>> >>> //Build the job configuration with ConfigHelper provided by >>> >>> Cassandra >>> >>> Job job = null; >>> >>> try { >>> >>> job = new Job(); >>> >>> } catch (IOException e) { >>> >>> e.printStackTrace(); //To change body of catch >>> statement use >>> >>> File | Settings | File Templates. >>> >>> } >>> >>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >>> >>> >>> >>> String host = args[1]; >>> >>> String port = args[2]; >>> >>> >>> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), >>> >>> host); >>> >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >>> >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), >>> >>> host); >>> >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >>> >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), >>> >>> "casDemo", "Words"); >>> >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(), >>> >>> "casDemo", "WordCount"); >>> >>> >>> >>> SlicePredicate predicate = new SlicePredicate(); >>> >>> SliceRange sliceRange = new SliceRange(); >>> >>> sliceRange.setStart(new byte[0]); >>> >>> sliceRange.setFinish(new byte[0]); >>> >>> predicate.setSlice_range(sliceRange); >>> >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >>> >>> predicate); >>> >>> >>> >>> ConfigHelper.setInputPartitioner(job.getConfiguration(), >>> >>> "Murmur3Partitioner"); >>> >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >>> >>> "Murmur3Partitioner"); >>> >>> >>> >>> // Make a new Hadoop RDD >>> >>> final SortedMap<ByteBuffer, IColumn> >>> byteBufferIColumnSortedMap = >>> >>> new TreeMap<ByteBuffer, IColumn>(); >>> >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd = >>> >>> sc.newAPIHadoopRDD(job.getConfiguration(), >>> ColumnFamilyInputFormat.class, >>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); >>> >>> >>> >>> >>> >>> i also tried the code segment that you have provided but i keep >>> getting >>> >>> the following error. >>> >>> >>> >>> java: >>> >>> >>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: >>> >>> >>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>) >>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to >>> >>> >>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92 >>> >>> of ? extends java.util.SortedMap>) >>> >>> >>> >>> Did you encounter this if so any help on this would be appreciated. >>> >>> >>> >>> Best Regards, >>> >>> Pulasthi >>> >>> >>> >>> >>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> >>> wrote: >>> >>>> >>> >>>> Hi, >>> >>>> >>> >>>> >>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some >>> help. >>> >>>> I've translated the the scala example - CassandraTest.scala - to >>> Java, but I >>> >>>> keep getting the following exception: >>> >>>> >>> >>>> Exception in thread "main" java.io.IOException: Could not get input >>> >>>> splits >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191) >>> >>>> at >>> >>>> >>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66) >>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) >>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) >>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560) >>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567) >>> >>>> at >>> >>>> >>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378) >>> >>>> at >>> >>>> >>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386) >>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119) >>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65) >>> >>>> Caused by: java.util.concurrent.ExecutionException: >>> >>>> java.lang.RuntimeException: >>> org.apache.thrift.transport.TTransportException >>> >>>> at >>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) >>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83) >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) >>> >>>> ... 9 more >>> >>>> Caused by: java.lang.RuntimeException: >>> >>>> org.apache.thrift.transport.TTransportException >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302) >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64) >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224) >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209) >>> >>>> at >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>> >>>> at >>> >>>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >>> >>>> at >>> >>>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >>> >>>> at java.lang.Thread.run(Thread.java:695) >>> >>>> Caused by: org.apache.thrift.transport.TTransportException >>> >>>> at >>> >>>> >>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) >>> >>>> at >>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >>> >>>> at >>> >>>> >>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) >>> >>>> at >>> >>>> >>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) >>> >>>> at >>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >>> >>>> at >>> >>>> >>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) >>> >>>> at >>> >>>> >>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) >>> >>>> at >>> >>>> >>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) >>> >>>> at >>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) >>> >>>> at >>> >>>> >>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324) >>> >>>> at >>> >>>> >>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308) >>> >>>> at >>> >>>> >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279) >>> >>>> ... 8 more >>> >>>> >>> >>>> >>> >>>> This is the relevant code portion: >>> >>>> >>> >>>> Job job = new Job(); >>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >>> >>>> String host = "<server>"; >>> >>>> String port = "9160"; >>> >>>> >>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); >>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); >>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest", >>> >>>> "UserEvent", true); >>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(), >>> >>>> "Murmur3Partitioner"); >>> >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >>> >>>> "Murmur3Partitioner"); >>> >>>> >>> >>>> SlicePredicate predicate = new SlicePredicate(); >>> >>>> SliceRange sliceRange = new SliceRange(); >>> >>>> sliceRange.setStart(new byte[0]); >>> >>>> sliceRange.setFinish(new byte[0]); >>> >>>> predicate.setSlice_range(sliceRange); >>> >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >>> >>>> predicate); >>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer, >>> >>>> IColumn>(); >>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd = >>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(), >>> ColumnFamilyInputFormat.class , >>> >>>> ByteBuffer.class, b.getClass()); >>> >>>> >>> >>>> >>> >>>> I would appreciate any input you may have. >>> >>>> >>> >>>> Thanks! >>> >>>> >>> >>>> Tal >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> -- >>> >>> Pulasthi Supun >>> >>> Undergraduate >>> >>> Dpt of Computer Science & Engineering >>> >>> University of Moratuwa >>> >>> Blog : http://pulasthisupun.blogspot.com/ >>> >>> Git hub profile: https://github.com/pulasthi >>> >> >>> >> >>> > >>> > >>> > >>> > -- >>> > Pulasthi Supun >>> > Undergraduate >>> > Dpt of Computer Science & Engineering >>> > University of Moratuwa >>> > Blog : http://pulasthisupun.blogspot.com/ >>> > Git hub profile: https://github.com/pulasthi >>> >> >> >> >> -- >> Pulasthi Supun >> Undergraduate >> Dpt of Computer Science & Engineering >> University of Moratuwa >> Blog : http://pulasthisupun.blogspot.com/ >> Git hub profile: >> <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi >> <https://github.com/pulasthi> >> > -- Pulasthi Supun Undergraduate Dpt of Computer Science & Engineering University of Moratuwa Blog : http://pulasthisupun.blogspot.com/ Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi <https://github.com/pulasthi>