Tal - that would be great to have open sourced if you can do it!
On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe <[email protected]> wrote: > Hi Tal, > > Thanks for the info will try it out and see how it goes. > > > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <[email protected]> wrote: >> >> Hi Pulasthi, >> >> I couldn't make it work, so what I ended up doing was implement 3 Java >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , another >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that extends >> org.apache.hadoop.mapreduce.RecordReader and used them to load data from >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works great! >> I'm cleaning up the code a bit and will upload to github as an open source >> (after the summit). >> > That's great looking forward check it out after you publish on github :). > > > Thanks, > Pulasthi >> >> I hope this helps for now, >> >> Tal >> >> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe >> <[email protected]> wrote: >>> >>> Hi Tal, >>> >>> I also tried doing this by converting the scala sample into Java but i am >>> getting an compile time error below is the code >>> >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo"); >>> >>> //Build the job configuration with ConfigHelper provided by >>> Cassandra >>> Job job = null; >>> try { >>> job = new Job(); >>> } catch (IOException e) { >>> e.printStackTrace(); //To change body of catch statement use >>> File | Settings | File Templates. >>> } >>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >>> >>> String host = args[1]; >>> String port = args[2]; >>> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), >>> host); >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), >>> host); >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), >>> "casDemo", "Words"); >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(), >>> "casDemo", "WordCount"); >>> >>> SlicePredicate predicate = new SlicePredicate(); >>> SliceRange sliceRange = new SliceRange(); >>> sliceRange.setStart(new byte[0]); >>> sliceRange.setFinish(new byte[0]); >>> predicate.setSlice_range(sliceRange); >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >>> predicate); >>> >>> ConfigHelper.setInputPartitioner(job.getConfiguration(), >>> "Murmur3Partitioner"); >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >>> "Murmur3Partitioner"); >>> >>> // Make a new Hadoop RDD >>> final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap = >>> new TreeMap<ByteBuffer, IColumn>(); >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd = >>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class, >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); >>> >>> >>> i also tried the code segment that you have provided but i keep getting >>> the following error. >>> >>> java: >>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: >>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>) >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to >>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92 >>> of ? extends java.util.SortedMap>) >>> >>> Did you encounter this if so any help on this would be appreciated. >>> >>> Best Regards, >>> Pulasthi >>> >>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help. >>>> I've translated the the scala example - CassandraTest.scala - to Java, but >>>> I >>>> keep getting the following exception: >>>> >>>> Exception in thread "main" java.io.IOException: Could not get input >>>> splits >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191) >>>> at >>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66) >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560) >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567) >>>> at >>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378) >>>> at >>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386) >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119) >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65) >>>> Caused by: java.util.concurrent.ExecutionException: >>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83) >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) >>>> ... 9 more >>>> Caused by: java.lang.RuntimeException: >>>> org.apache.thrift.transport.TTransportException >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302) >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64) >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224) >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209) >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >>>> at java.lang.Thread.run(Thread.java:695) >>>> Caused by: org.apache.thrift.transport.TTransportException >>>> at >>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >>>> at >>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) >>>> at >>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >>>> at >>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) >>>> at >>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) >>>> at >>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) >>>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) >>>> at >>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324) >>>> at >>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308) >>>> at >>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279) >>>> ... 8 more >>>> >>>> >>>> This is the relevant code portion: >>>> >>>> Job job = new Job(); >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >>>> String host = "<server>"; >>>> String port = "9160"; >>>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest", >>>> "UserEvent", true); >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(), >>>> "Murmur3Partitioner"); >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >>>> "Murmur3Partitioner"); >>>> >>>> SlicePredicate predicate = new SlicePredicate(); >>>> SliceRange sliceRange = new SliceRange(); >>>> sliceRange.setStart(new byte[0]); >>>> sliceRange.setFinish(new byte[0]); >>>> predicate.setSlice_range(sliceRange); >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >>>> predicate); >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer, >>>> IColumn>(); >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd = >>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class , >>>> ByteBuffer.class, b.getClass()); >>>> >>>> >>>> I would appreciate any input you may have. >>>> >>>> Thanks! >>>> >>>> Tal >>> >>> >>> >>> >>> -- >>> Pulasthi Supun >>> Undergraduate >>> Dpt of Computer Science & Engineering >>> University of Moratuwa >>> Blog : http://pulasthisupun.blogspot.com/ >>> Git hub profile: https://github.com/pulasthi >> >> > > > > -- > Pulasthi Supun > Undergraduate > Dpt of Computer Science & Engineering > University of Moratuwa > Blog : http://pulasthisupun.blogspot.com/ > Git hub profile: https://github.com/pulasthi
