Hi Tal, Just checking if you have added your code to github :). if you have could you point me to it.
Best Regards, Pulasthi On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <[email protected]>wrote: > Tal - that would be great to have open sourced if you can do it! > > On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe > <[email protected]> wrote: > > Hi Tal, > > > > Thanks for the info will try it out and see how it goes. > > > > > > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <[email protected]> > wrote: > >> > >> Hi Pulasthi, > >> > >> I couldn't make it work, so what I ended up doing was implement 3 Java > >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , > another > >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that > extends > >> org.apache.hadoop.mapreduce.RecordReader and used them to load data from > >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works > great! > >> I'm cleaning up the code a bit and will upload to github as an open > source > >> (after the summit). > >> > > That's great looking forward check it out after you publish on github :). > > > > > > Thanks, > > Pulasthi > >> > >> I hope this helps for now, > >> > >> Tal > >> > >> > >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe > >> <[email protected]> wrote: > >>> > >>> Hi Tal, > >>> > >>> I also tried doing this by converting the scala sample into Java but i > am > >>> getting an compile time error below is the code > >>> > >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo"); > >>> > >>> //Build the job configuration with ConfigHelper provided by > >>> Cassandra > >>> Job job = null; > >>> try { > >>> job = new Job(); > >>> } catch (IOException e) { > >>> e.printStackTrace(); //To change body of catch statement > use > >>> File | Settings | File Templates. > >>> } > >>> job.setInputFormatClass(ColumnFamilyInputFormat.class); > >>> > >>> String host = args[1]; > >>> String port = args[2]; > >>> > >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), > >>> host); > >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); > >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), > >>> host); > >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); > >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), > >>> "casDemo", "Words"); > >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(), > >>> "casDemo", "WordCount"); > >>> > >>> SlicePredicate predicate = new SlicePredicate(); > >>> SliceRange sliceRange = new SliceRange(); > >>> sliceRange.setStart(new byte[0]); > >>> sliceRange.setFinish(new byte[0]); > >>> predicate.setSlice_range(sliceRange); > >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), > >>> predicate); > >>> > >>> ConfigHelper.setInputPartitioner(job.getConfiguration(), > >>> "Murmur3Partitioner"); > >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), > >>> "Murmur3Partitioner"); > >>> > >>> // Make a new Hadoop RDD > >>> final SortedMap<ByteBuffer, IColumn> > byteBufferIColumnSortedMap = > >>> new TreeMap<ByteBuffer, IColumn>(); > >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd = > >>> sc.newAPIHadoopRDD(job.getConfiguration(), > ColumnFamilyInputFormat.class, > >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); > >>> > >>> > >>> i also tried the code segment that you have provided but i keep getting > >>> the following error. > >>> > >>> java: > >>> > /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: > >>> > <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>) > >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to > >>> > (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92 > >>> of ? extends java.util.SortedMap>) > >>> > >>> Did you encounter this if so any help on this would be appreciated. > >>> > >>> Best Regards, > >>> Pulasthi > >>> > >>> > >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> > wrote: > >>>> > >>>> Hi, > >>>> > >>>> > >>>> I'm trying to use data stored in cassandra (v1.2) and need some help. > >>>> I've translated the the scala example - CassandraTest.scala - to > Java, but I > >>>> keep getting the following exception: > >>>> > >>>> Exception in thread "main" java.io.IOException: Could not get input > >>>> splits > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191) > >>>> at > >>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66) > >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) > >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) > >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560) > >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567) > >>>> at > >>>> > org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378) > >>>> at > >>>> > org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386) > >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119) > >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65) > >>>> Caused by: java.util.concurrent.ExecutionException: > >>>> java.lang.RuntimeException: > org.apache.thrift.transport.TTransportException > >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) > >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83) > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187) > >>>> ... 9 more > >>>> Caused by: java.lang.RuntimeException: > >>>> org.apache.thrift.transport.TTransportException > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302) > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64) > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224) > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209) > >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>> at > >>>> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > >>>> at > >>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > >>>> at java.lang.Thread.run(Thread.java:695) > >>>> Caused by: org.apache.thrift.transport.TTransportException > >>>> at > >>>> > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) > >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) > >>>> at > >>>> > org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) > >>>> at > >>>> > org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) > >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) > >>>> at > >>>> > org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) > >>>> at > >>>> > org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) > >>>> at > >>>> > org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) > >>>> at > org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) > >>>> at > >>>> > org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324) > >>>> at > >>>> > org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308) > >>>> at > >>>> > org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279) > >>>> ... 8 more > >>>> > >>>> > >>>> This is the relevant code portion: > >>>> > >>>> Job job = new Job(); > >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class); > >>>> String host = "<server>"; > >>>> String port = "9160"; > >>>> > >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); > >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port); > >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); > >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); > >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest", > >>>> "UserEvent", true); > >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(), > >>>> "Murmur3Partitioner"); > >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(), > >>>> "Murmur3Partitioner"); > >>>> > >>>> SlicePredicate predicate = new SlicePredicate(); > >>>> SliceRange sliceRange = new SliceRange(); > >>>> sliceRange.setStart(new byte[0]); > >>>> sliceRange.setFinish(new byte[0]); > >>>> predicate.setSlice_range(sliceRange); > >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), > >>>> predicate); > >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer, > >>>> IColumn>(); > >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd = > >>>> ctx.newAPIHadoopRDD(job.getConfiguration(), > ColumnFamilyInputFormat.class , > >>>> ByteBuffer.class, b.getClass()); > >>>> > >>>> > >>>> I would appreciate any input you may have. > >>>> > >>>> Thanks! > >>>> > >>>> Tal > >>> > >>> > >>> > >>> > >>> -- > >>> Pulasthi Supun > >>> Undergraduate > >>> Dpt of Computer Science & Engineering > >>> University of Moratuwa > >>> Blog : http://pulasthisupun.blogspot.com/ > >>> Git hub profile: https://github.com/pulasthi > >> > >> > > > > > > > > -- > > Pulasthi Supun > > Undergraduate > > Dpt of Computer Science & Engineering > > University of Moratuwa > > Blog : http://pulasthisupun.blogspot.com/ > > Git hub profile: https://github.com/pulasthi > -- Pulasthi Supun Undergraduate Dpt of Computer Science & Engineering University of Moratuwa Blog : http://pulasthisupun.blogspot.com/ Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi <https://github.com/pulasthi>
