Hi Tal, Thanks for the info will try it out and see how it goes.
On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <[email protected]> wrote: > Hi Pulasthi, > > I couldn't make it work, so what I ended up doing was implement 3 Java > classes - one that extends org.apache.hadoop.mapreduce.InputFormat , > another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that > extends org.apache.hadoop.mapreduce.RecordReader and used them to load data > from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works > great! I'm cleaning up the code a bit and will upload to github as an open > source (after the summit). > > That's great looking forward check it out after you publish on github :). Thanks, Pulasthi > I hope this helps for now, > > Tal > > > On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe < > [email protected]> wrote: > >> Hi Tal, >> >> I also tried doing this by converting the scala sample into Java but i am >> getting an compile time error below is the code >> >> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo"); >> >> //Build the job configuration with ConfigHelper provided by >> Cassandra >> Job job = null; >> try { >> job = new Job(); >> } catch (IOException e) { >> e.printStackTrace(); //To change body of catch statement use >> File | Settings | File Templates. >> } >> job.setInputFormatClass(ColumnFamilyInputFormat.class); >> >> String host = args[1]; >> String port = args[2]; >> >> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); >> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), >> host); >> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >> ConfigHelper.setInputColumnFamily(job.getConfiguration(), >> "casDemo", "Words"); >> ConfigHelper.setOutputColumnFamily(job.getConfiguration(), >> "casDemo", "WordCount"); >> >> SlicePredicate predicate = new SlicePredicate(); >> SliceRange sliceRange = new SliceRange(); >> sliceRange.setStart(new byte[0]); >> sliceRange.setFinish(new byte[0]); >> predicate.setSlice_range(sliceRange); >> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >> predicate); >> >> ConfigHelper.setInputPartitioner(job.getConfiguration(), >> "Murmur3Partitioner"); >> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >> "Murmur3Partitioner"); >> >> // Make a new Hadoop RDD >> final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap = >> new TreeMap<ByteBuffer, IColumn>(); >> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd = >> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class, >> ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); >> >> >> i also tried the code segment that you have provided but i keep getting >> the following error. >> >> java: >> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: >> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>) >> in org.apache.spark.api.java.JavaSparkContext cannot be applied to >> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92 >> of ? extends java.util.SortedMap>) >> >> Did you encounter this if so any help on this would be appreciated. >> >> Best Regards, >> Pulasthi >> >> >> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> wrote: >> >>> Hi, >>> >>> >>> I'm trying to use data stored in cassandra (v1.2) and need some help. >>> I've translated the the scala example - CassandraTest.scala - to Java, but >>> I keep getting the following exception: >>> >>> Exception in thread "main" java.io.IOException: Could not get input >>> splits >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191) >>> at >>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) >>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560) >>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567) >>> at >>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378) >>> at >>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386) >>> at com.taboola.test.spark_cassandra.App.calc(App.java:119) >>> at com.taboola.test.spark_cassandra.App.main(App.java:65) >>> Caused by: java.util.concurrent.ExecutionException: >>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException >>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) >>> at java.util.concurrent.FutureTask.get(FutureTask.java:83) >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) >>> ... 9 more >>> Caused by: java.lang.RuntimeException: >>> org.apache.thrift.transport.TTransportException >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302) >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64) >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224) >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209) >>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >>> at java.lang.Thread.run(Thread.java:695) >>> Caused by: org.apache.thrift.transport.TTransportException >>> at >>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) >>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >>> at >>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) >>> at >>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) >>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >>> at >>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) >>> at >>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) >>> at >>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) >>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) >>> at >>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324) >>> at >>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308) >>> at >>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279) >>> ... 8 more >>> >>> >>> This is the relevant code portion: >>> >>> Job job = new Job(); >>> job.setInputFormatClass(ColumnFamilyInputFormat.class); >>> String host = "<server>"; >>> String port = "9160"; >>> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest", >>> "UserEvent", true); >>> ConfigHelper.setInputPartitioner(job.getConfiguration(), " >>> Murmur3Partitioner"); >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >>> "Murmur3Partitioner"); >>> >>> SlicePredicate predicate = new SlicePredicate(); >>> SliceRange sliceRange = new SliceRange(); >>> sliceRange.setStart(new byte[0]); >>> sliceRange.setFinish(new byte[0]); >>> predicate.setSlice_range(sliceRange); >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >>> predicate); >>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer, >>> IColumn>(); >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd = >>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class , >>> ByteBuffer.class, b.getClass()); >>> >>> >>> I would appreciate any input you may have. >>> >>> Thanks! >>> >>> Tal >>> >> >> >> >> -- >> Pulasthi Supun >> Undergraduate >> Dpt of Computer Science & Engineering >> University of Moratuwa >> Blog : http://pulasthisupun.blogspot.com/ >> Git hub profile: >> <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi >> <https://github.com/pulasthi> >> > > -- Pulasthi Supun Undergraduate Dpt of Computer Science & Engineering University of Moratuwa Blog : http://pulasthisupun.blogspot.com/ Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi <https://github.com/pulasthi>
