I am curious why you use the 1.0.4 java artifact with the latest 1.1.0? This might be your compilation problem - The older java version. <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector_2.10</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector-java_2.10</artifactId> <version>1.0.4</version> </dependency>
See: - doc https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md - mvn repo http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10/1.1.0 - Helena @helenaedelson On Dec 8, 2014, at 12:47 PM, m.sar...@accenture.com wrote: > Hi, > > I am intending to save the streaming data from kafka into Cassandra, > using spark-streaming: > But there seems to be problem with line > javaFunctions(data).writerBuilder("testkeyspace", "test_table", > mapToRow(TestTable.class)).saveToCassandra(); > I am getting 2 errors. > the code, the error-log and POM.xml dependencies are listed below: > Please help me find the reason as to why is this happening. > > > public class SparkStream { > static int key=0; > public static void main(String args[]) throws Exception > { > if(args.length != 3) > { > System.out.println("SparkStream <zookeeper_ip> <group_nm> > <topic1,topic2,...>"); > System.exit(1); > } > > Logger.getLogger("org").setLevel(Level.OFF); > Logger.getLogger("akka").setLevel(Level.OFF); > Map<String,Integer> topicMap = new HashMap<String,Integer>(); > String[] topic = args[2].split(","); > for(String t: topic) > { > topicMap.put(t, new Integer(3)); > } > > /* Connection to Spark */ > SparkConf conf = new SparkConf(); > JavaSparkContext sc = new JavaSparkContext("local[4]", > "SparkStream",conf); > JavaStreamingContext jssc = new JavaStreamingContext(sc, new > Duration(3000)); > > > /* connection to cassandra */ > /* conf.set("spark.cassandra.connection.host", "127.0.0.1:9042"); > CassandraConnector connector = CassandraConnector.apply(sc.getConf()); > Session session = connector.openSession(); > session.execute("CREATE TABLE IF NOT EXISTS testkeyspace.test_table > (key INT PRIMARY KEY, value TEXT)"); > */ > > /* Receive Kafka streaming inputs */ > JavaPairReceiverInputDStream<String, String> messages = > KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); > > > /* Create DStream */ > JavaDStream<TestTable> data = messages.map(new > Function<Tuple2<String,String>, TestTable >() > { > public TestTable call(Tuple2<String, String> message) > { > return new TestTable(new Integer(++key), message._2() ); > } > } > ); > > > /* Write to cassandra */ > javaFunctions(data).writerBuilder("testkeyspace", "test_table", > mapToRow(TestTable.class)).saveToCassandra(); > // data.print(); > > > jssc.start(); > jssc.awaitTermination(); > > } > } > > class TestTable implements Serializable > { > Integer key; > String value; > > public TestTable() {} > > public TestTable(Integer k, String v) > { > key=k; > value=v; > } > > public Integer getKey(){ > return key; > } > > public void setKey(Integer k){ > key=k; > } > > public String getValue(){ > return value; > } > > public void setValue(String v){ > value=v; > } > > public String toString(){ > return MessageFormat.format("TestTable'{'key={0}, > value={1}'}'", key, value); > } > } > > The output log is: > > [INFO] Compiling 1 source file to > /root/Documents/SparkStreamSample/target/classes > [INFO] 2 errors > [INFO] ------------------------------------------------------------- > [ERROR] COMPILATION ERROR : > [INFO] ------------------------------------------------------------- > [ERROR] > /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,81] > cannot find symbol > symbol: method mapToRow(java.lang.Class<com.spark.TestTable>) > location: class com.spark.SparkStream > [ERROR] > /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,17] > no suitable method found for > javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable>) > method > com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<T>,java.lang.Class<T>) > is not applicable > (cannot infer type-variable(s) T > (actual and formal argument lists differ in length)) > method > com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.dstream.DStream<T>,java.lang.Class<T>) > is not applicable > (cannot infer type-variable(s) T > (actual and formal argument lists differ in length)) > method > com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.api.java.JavaRDD<T>,java.lang.Class<T>) > is not applicable > (cannot infer type-variable(s) T > (actual and formal argument lists differ in length)) > method > com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.rdd.RDD<T>,java.lang.Class<T>) > is not applicable > (cannot infer type-variable(s) T > (actual and formal argument lists differ in length)) > method > com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext) > is not applicable > (argument mismatch; > org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable> > cannot be converted to > org.apache.spark.streaming.api.java.JavaStreamingContext) > method > com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext) > is not applicable > (argument mismatch; > org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable> > cannot be converted to org.apache.spark.streaming.StreamingContext) > method > com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.api.java.JavaSparkContext) > is not applicable > (argument mismatch; > org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable> > cannot be converted to org.apache.spark.api.java.JavaSparkContext) > method > com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.SparkContext) > is not applicable > (argument mismatch; > org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable> > cannot be converted to org.apache.spark.SparkContext) > > > And the POM dependencies are: > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming-kafka_2.10</artifactId> > <version>1.1.0</version> > </dependency> > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming_2.10</artifactId> > <version>1.1.0</version> > </dependency> > > <dependency> > <groupId>com.datastax.spark</groupId> > <artifactId>spark-cassandra-connector_2.10</artifactId> > <version>1.1.0</version> > </dependency> > <dependency> > <groupId>com.datastax.spark</groupId> > <artifactId>spark-cassandra-connector-java_2.10</artifactId> > <version>1.0.4</version> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-core_2.10</artifactId> > <version>1.1.1</version> > </dependency> > > > <dependency> > <groupId>com.msiops.footing</groupId> > <artifactId>footing-tuple</artifactId> > <version>0.2</version> > </dependency> > > <dependency> > <groupId>com.datastax.cassandra</groupId> > <artifactId>cassandra-driver-core</artifactId> > <version>1.0.8</version> > </dependency> > > > Thanks, > Aiman > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Where allowed by local law, > electronic communications with Accenture and its affiliates, including e-mail > and instant messaging (including content), may be scanned by our systems for > the purposes of information security and assessment of internal compliance > with Accenture policy. > ______________________________________________________________________________________ > > www.accenture.com > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >