Try to remove the Kafka code as it seems Kafka is not the issue. Here. Create a DS and save to Cassandra and see what happens....Even in the console That should give u a starting point? Hth
On 9 Mar 2017 3:07 am, "sathyanarayanan mudhaliyar" < sathyanarayananmudhali...@gmail.com> wrote: code: directKafkaStream.foreachRDD(rdd -> { rdd.foreach(record -> { messages1.add(record._2); }); JavaRDD<String> lines = sc.parallelize(messages1); JavaPairRDD<Integer, String> data = lines.mapToPair(new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String a) { String[] tokens = StringUtil.split(a, '%'); return new Tuple2<Integer, String>(Integer.getInteger(tokens[3]),tokens[2]); } }); Function2<String, String, String> reduceSumFunc = (accum, n) -> (accum.concat(n)); JavaPairRDD<Integer, String> yearCount = data.reduceByKey(reduceSumFunc); javaFunctions(yearCount).writerBuilder("movie_keyspace", "movie_count", mapTupleToRow(Integer.class, String.class)).withColumnSelector(someColumns("year","list_of_ movies")).saveToCassandra(); // this is the error line }); ---------- error: com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year at com.datastax.spark.connector.writer.RoutingKeyGenerator$$ anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49) at scala.collection.immutable.Range.foreach$mVc$sp(Range. scala:160) at com.datastax.spark.connector.writer.RoutingKeyGenerator. fillRoutingKey(RoutingKeyGenerator.scala:47) at com.datastax.spark.connector.writer.RoutingKeyGenerator. apply(RoutingKeyGenerator.scala:56) at com.datastax.spark.connector.writer.TableWriter. batchRoutingKey(TableWriter.scala:126) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1$$anonfun$19.apply(TableWriter.scala:151) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1$$anonfun$19.apply(TableWriter.scala:151) at com.datastax.spark.connector.writer.GroupingBatchBuilder. next(GroupingBatchBuilder.scala:107) at com.datastax.spark.connector.writer.GroupingBatchBuilder. next(GroupingBatchBuilder.scala:31) at scala.collection.Iterator$class.foreach(Iterator.scala: 893) at com.datastax.spark.connector.writer.GroupingBatchBuilder. foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$ anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$ anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector. closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector. withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write( TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$ saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$ saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ---------- Trying to connect Kafka and cassandra using spark Able to store a JavaRDD but not able to store a JavaPairRDD into cassandra I have given comment in the line where the error is Thank you --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org