Hi all, This should work:
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd = context.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), ByteBuffer.class, SortedMap.class); I have translated the word count written in scala to java, i just can't send it right now... Best Regards. Lucas. On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" < [email protected]> wrote: > Hi Tal, > > Just checking if you have added your code to github :). if you have could > you point me to it. > > Best Regards, > Pulasthi > > > On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <[email protected]>wrote: > >> Tal - that would be great to have open sourced if you can do it! >> >> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe >> <[email protected]> wrote: >> > Hi Tal, >> > >> > Thanks for the info will try it out and see how it goes. >> > >> > >> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <[email protected]> >> wrote: >> >> >> >> Hi Pulasthi, >> >> >> >> I couldn't make it work, so what I ended up doing was implement 3 Java >> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , >> another >> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that >> extends >> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data >> from >> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works >> great! >> >> I'm cleaning up the code a bit and will upload to github as an open >> source >> >> (after the summit). >> >> >> > That's great looking forward check it out after you publish on github >> :). >> > >> > >> > Thanks, >> > Pulasthi >> >> >> >> I hope this helps for now, >> >> >> >> Tal >> >> >> >> >> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe >> >> <[email protected]> wrote: >> >>> >> >>> Hi Tal, >> >>> >> >>> I also tried doing this by converting the scala sample into Java but >> i am >> >>> getting an compile time error below is the code >> >>> >> >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo"); >> >>> >> >>> //Build the job configuration with ConfigHelper provided by >> >>> Cassandra >> >>> Job job = null; >> >>> try { >> >>> job = new Job(); >> >>> } catch (IOException e) { >> >>> e.printStackTrace(); //To change body of catch statement >> use >> >>> File | Settings | File Templates. >> >>> } >> >>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >> >>> >> >>> String host = args[1]; >> >>> String port = args[2]; >> >>> >> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), >> >>> host); >> >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >> >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), >> >>> host); >> >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >> >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), >> >>> "casDemo", "Words"); >> >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(), >> >>> "casDemo", "WordCount"); >> >>> >> >>> SlicePredicate predicate = new SlicePredicate(); >> >>> SliceRange sliceRange = new SliceRange(); >> >>> sliceRange.setStart(new byte[0]); >> >>> sliceRange.setFinish(new byte[0]); >> >>> predicate.setSlice_range(sliceRange); >> >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >> >>> predicate); >> >>> >> >>> ConfigHelper.setInputPartitioner(job.getConfiguration(), >> >>> "Murmur3Partitioner"); >> >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >> >>> "Murmur3Partitioner"); >> >>> >> >>> // Make a new Hadoop RDD >> >>> final SortedMap<ByteBuffer, IColumn> >> byteBufferIColumnSortedMap = >> >>> new TreeMap<ByteBuffer, IColumn>(); >> >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd = >> >>> sc.newAPIHadoopRDD(job.getConfiguration(), >> ColumnFamilyInputFormat.class, >> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); >> >>> >> >>> >> >>> i also tried the code segment that you have provided but i keep >> getting >> >>> the following error. >> >>> >> >>> java: >> >>> >> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: >> >>> >> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>) >> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to >> >>> >> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92 >> >>> of ? extends java.util.SortedMap>) >> >>> >> >>> Did you encounter this if so any help on this would be appreciated. >> >>> >> >>> Best Regards, >> >>> Pulasthi >> >>> >> >>> >> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> >> wrote: >> >>>> >> >>>> Hi, >> >>>> >> >>>> >> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help. >> >>>> I've translated the the scala example - CassandraTest.scala - to >> Java, but I >> >>>> keep getting the following exception: >> >>>> >> >>>> Exception in thread "main" java.io.IOException: Could not get input >> >>>> splits >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191) >> >>>> at >> >>>> >> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66) >> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) >> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) >> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560) >> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567) >> >>>> at >> >>>> >> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378) >> >>>> at >> >>>> >> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386) >> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119) >> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65) >> >>>> Caused by: java.util.concurrent.ExecutionException: >> >>>> java.lang.RuntimeException: >> org.apache.thrift.transport.TTransportException >> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) >> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83) >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) >> >>>> ... 9 more >> >>>> Caused by: java.lang.RuntimeException: >> >>>> org.apache.thrift.transport.TTransportException >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302) >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64) >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224) >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209) >> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> >>>> at >> >>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >> >>>> at >> >>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >> >>>> at java.lang.Thread.run(Thread.java:695) >> >>>> Caused by: org.apache.thrift.transport.TTransportException >> >>>> at >> >>>> >> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) >> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >> >>>> at >> >>>> >> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) >> >>>> at >> >>>> >> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) >> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >> >>>> at >> >>>> >> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) >> >>>> at >> >>>> >> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) >> >>>> at >> >>>> >> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) >> >>>> at >> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) >> >>>> at >> >>>> >> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324) >> >>>> at >> >>>> >> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308) >> >>>> at >> >>>> >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279) >> >>>> ... 8 more >> >>>> >> >>>> >> >>>> This is the relevant code portion: >> >>>> >> >>>> Job job = new Job(); >> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >> >>>> String host = "<server>"; >> >>>> String port = "9160"; >> >>>> >> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); >> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); >> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest", >> >>>> "UserEvent", true); >> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(), >> >>>> "Murmur3Partitioner"); >> >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >> >>>> "Murmur3Partitioner"); >> >>>> >> >>>> SlicePredicate predicate = new SlicePredicate(); >> >>>> SliceRange sliceRange = new SliceRange(); >> >>>> sliceRange.setStart(new byte[0]); >> >>>> sliceRange.setFinish(new byte[0]); >> >>>> predicate.setSlice_range(sliceRange); >> >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >> >>>> predicate); >> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer, >> >>>> IColumn>(); >> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd = >> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(), >> ColumnFamilyInputFormat.class , >> >>>> ByteBuffer.class, b.getClass()); >> >>>> >> >>>> >> >>>> I would appreciate any input you may have. >> >>>> >> >>>> Thanks! >> >>>> >> >>>> Tal >> >>> >> >>> >> >>> >> >>> >> >>> -- >> >>> Pulasthi Supun >> >>> Undergraduate >> >>> Dpt of Computer Science & Engineering >> >>> University of Moratuwa >> >>> Blog : http://pulasthisupun.blogspot.com/ >> >>> Git hub profile: https://github.com/pulasthi >> >> >> >> >> > >> > >> > >> > -- >> > Pulasthi Supun >> > Undergraduate >> > Dpt of Computer Science & Engineering >> > University of Moratuwa >> > Blog : http://pulasthisupun.blogspot.com/ >> > Git hub profile: https://github.com/pulasthi >> > > > > -- > Pulasthi Supun > Undergraduate > Dpt of Computer Science & Engineering > University of Moratuwa > Blog : http://pulasthisupun.blogspot.com/ > Git hub profile: > <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi > <https://github.com/pulasthi> >
