Hi Pulasthi, I couldn't make it work, so what I ended up doing was implement 3 Java classes - one that extends org.apache.hadoop.mapreduce.InputFormat , another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that extends org.apache.hadoop.mapreduce.RecordReader and used them to load data from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works great! I'm cleaning up the code a bit and will upload to github as an open source (after the summit).
I hope this helps for now, Tal On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe < [email protected]> wrote: > Hi Tal, > > I also tried doing this by converting the scala sample into Java but i am > getting an compile time error below is the code > > JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo"); > > //Build the job configuration with ConfigHelper provided by > Cassandra > Job job = null; > try { > job = new Job(); > } catch (IOException e) { > e.printStackTrace(); //To change body of catch statement use > File | Settings | File Templates. > } > job.setInputFormatClass(ColumnFamilyInputFormat.class); > > String host = args[1]; > String port = args[2]; > > ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); > ConfigHelper.setInputRpcPort(job.getConfiguration(), port); > ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); > ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); > ConfigHelper.setInputColumnFamily(job.getConfiguration(), > "casDemo", "Words"); > ConfigHelper.setOutputColumnFamily(job.getConfiguration(), > "casDemo", "WordCount"); > > SlicePredicate predicate = new SlicePredicate(); > SliceRange sliceRange = new SliceRange(); > sliceRange.setStart(new byte[0]); > sliceRange.setFinish(new byte[0]); > predicate.setSlice_range(sliceRange); > ConfigHelper.setInputSlicePredicate(job.getConfiguration(), > predicate); > > ConfigHelper.setInputPartitioner(job.getConfiguration(), > "Murmur3Partitioner"); > ConfigHelper.setOutputPartitioner(job.getConfiguration(), > "Murmur3Partitioner"); > > // Make a new Hadoop RDD > final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap = > new TreeMap<ByteBuffer, IColumn>(); > JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd = > sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class, > ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); > > > i also tried the code segment that you have provided but i keep getting > the following error. > > java: > /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: > <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>) > in org.apache.spark.api.java.JavaSparkContext cannot be applied to > (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92 > of ? extends java.util.SortedMap>) > > Did you encounter this if so any help on this would be appreciated. > > Best Regards, > Pulasthi > > > On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> wrote: > >> Hi, >> >> >> I'm trying to use data stored in cassandra (v1.2) and need some help. >> I've translated the the scala example - CassandraTest.scala - to Java, but >> I keep getting the following exception: >> >> Exception in thread "main" java.io.IOException: Could not get input splits >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191) >> at >> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66) >> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) >> at org.apache.spark.rdd.RDD.collect(RDD.scala:560) >> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567) >> at >> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378) >> at >> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386) >> at com.taboola.test.spark_cassandra.App.calc(App.java:119) >> at com.taboola.test.spark_cassandra.App.main(App.java:65) >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException >> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) >> at java.util.concurrent.FutureTask.get(FutureTask.java:83) >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) >> ... 9 more >> Caused by: java.lang.RuntimeException: >> org.apache.thrift.transport.TTransportException >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302) >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64) >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224) >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209) >> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) >> at java.lang.Thread.run(Thread.java:695) >> Caused by: org.apache.thrift.transport.TTransportException >> at >> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) >> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >> at >> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) >> at >> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) >> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) >> at >> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) >> at >> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) >> at >> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) >> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) >> at >> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324) >> at >> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308) >> at >> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279) >> ... 8 more >> >> >> This is the relevant code portion: >> >> Job job = new Job(); >> job.setInputFormatClass(ColumnFamilyInputFormat.class); >> String host = "<server>"; >> String port = "9160"; >> >> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); >> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); >> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); >> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); >> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest", >> "UserEvent", true); >> ConfigHelper.setInputPartitioner(job.getConfiguration(), " >> Murmur3Partitioner"); >> ConfigHelper.setOutputPartitioner(job.getConfiguration(), >> "Murmur3Partitioner"); >> >> SlicePredicate predicate = new SlicePredicate(); >> SliceRange sliceRange = new SliceRange(); >> sliceRange.setStart(new byte[0]); >> sliceRange.setFinish(new byte[0]); >> predicate.setSlice_range(sliceRange); >> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), >> predicate); >> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer, >> IColumn>(); >> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd = >> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class , >> ByteBuffer.class, b.getClass()); >> >> >> I would appreciate any input you may have. >> >> Thanks! >> >> Tal >> > > > > -- > Pulasthi Supun > Undergraduate > Dpt of Computer Science & Engineering > University of Moratuwa > Blog : http://pulasthisupun.blogspot.com/ > Git hub profile: > <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi > <https://github.com/pulasthi> >
