Hi,
I'm trying to use data stored in cassandra (v1.2) and need some help. I've
translated the the scala example - CassandraTest.scala - to Java, but I
keep getting the following exception:
Exception in thread "main" java.io.IOException: Could not get input splits
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
at
org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
at com.taboola.test.spark_cassandra.App.calc(App.java:119)
at com.taboola.test.spark_cassandra.App.main(App.java:65)
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
... 9 more
Caused by: java.lang.RuntimeException:
org.apache.thrift.transport.TTransportException
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at
org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
at
org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
... 8 more
This is the relevant code portion:
Job job = new Job();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
String host = "<server>";
String port = "9160";
ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
"UserEvent", true);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner
");
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
IColumn>();
JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
ByteBuffer.class, b.getClass());
I would appreciate any input you may have.
Thanks!
Tal