Hi Tal,

I also tried doing this by converting the scala sample into Java but i am
getting an compile time error below is the code

 JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");

        //Build the job configuration with ConfigHelper provided by
Cassandra
        Job job = null;
        try {
            job = new Job();
        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use
File | Settings | File Templates.
        }
        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        String host = args[1];
        String port = args[2];

        ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
        ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
        ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
        ConfigHelper.setInputColumnFamily(job.getConfiguration(),
"casDemo", "Words");
        ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
"casDemo", "WordCount");

        SlicePredicate predicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);
        predicate.setSlice_range(sliceRange);
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
predicate);

        ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
        ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");

        // Make a new Hadoop RDD
        final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
new TreeMap<ByteBuffer, IColumn>();
        JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
ByteBuffer.class, byteBufferIColumnSortedMap.getClass());


i also tried the code segment that you have provided but i keep getting the
following error.

java:
/home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
<K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
in org.apache.spark.api.java.JavaSparkContext cannot be applied to
(org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
of ? extends java.util.SortedMap>)

Did you encounter this if so any help on this would be appreciated.

Best Regards,
Pulasthi


On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <[email protected]> wrote:

> Hi,
>
>
> I'm trying to use data stored in cassandra (v1.2) and need some help. I've
> translated the the scala example - CassandraTest.scala - to Java, but I
> keep getting the following exception:
>
> Exception in thread "main" java.io.IOException: Could not get input splits
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>  at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
> at
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>  at
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>  at com.taboola.test.spark_cassandra.App.main(App.java:65)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>  at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>  at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
> ... 9 more
> Caused by: java.lang.RuntimeException:
> org.apache.thrift.transport.TTransportException
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>  at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>  at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: org.apache.thrift.transport.TTransportException
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>  at
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> at
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>  at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>  at
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> at
> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>  at
> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>  ... 8 more
>
>
> This is the relevant code portion:
>
>             Job job = new Job();
>  job.setInputFormatClass(ColumnFamilyInputFormat.class);
>  String host = "<server>";
> String port = "9160";
>
> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>  ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>  ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
> "UserEvent", true);
>  ConfigHelper.setInputPartitioner(job.getConfiguration(), "
> Murmur3Partitioner");
>     ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>
>     SlicePredicate predicate = new SlicePredicate();
>     SliceRange sliceRange = new SliceRange();
>     sliceRange.setStart(new byte[0]);
>     sliceRange.setFinish(new byte[0]);
>     predicate.setSlice_range(sliceRange);
>     ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
>   final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
> IColumn>();
>  JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
> ByteBuffer.class, b.getClass());
>
>
> I would appreciate any input you may have.
>
> Thanks!
>
> Tal
>



-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Reply via email to