I am curious why you use the 1.0.4 java artifact with the latest 1.1.0? This 
might be your compilation problem - The older java version.
 
<dependency>
   <groupId>com.datastax.spark</groupId>
   <artifactId>spark-cassandra-connector_2.10</artifactId>
   <version>1.1.0</version>
</dependency>
<dependency>
   <groupId>com.datastax.spark</groupId>
   <artifactId>spark-cassandra-connector-java_2.10</artifactId>
   <version>1.0.4</version>
</dependency>

See:
-  doc 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md
-  mvn repo 
http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10/1.1.0
  
- Helena
@helenaedelson


On Dec 8, 2014, at 12:47 PM, m.sar...@accenture.com wrote:

> Hi,
> 
> I am intending to save the streaming data from kafka into Cassandra,
> using spark-streaming:
> But there seems to be problem with line
> javaFunctions(data).writerBuilder("testkeyspace", "test_table", 
> mapToRow(TestTable.class)).saveToCassandra();
> I am getting 2 errors.
> the code, the error-log and POM.xml dependencies are listed below:
> Please help me find the reason as to why is this happening.
> 
> 
> public class SparkStream {
>    static int key=0;
>    public static void main(String args[]) throws Exception
>    {
>        if(args.length != 3)
>        {
>            System.out.println("SparkStream <zookeeper_ip> <group_nm> 
> <topic1,topic2,...>");
>            System.exit(1);
>        }
> 
>        Logger.getLogger("org").setLevel(Level.OFF);
>        Logger.getLogger("akka").setLevel(Level.OFF);
>        Map<String,Integer> topicMap = new HashMap<String,Integer>();
>        String[] topic = args[2].split(",");
>        for(String t: topic)
>        {
>            topicMap.put(t, new Integer(3));
>        }
> 
>        /* Connection to Spark */
>        SparkConf conf = new SparkConf();
>        JavaSparkContext sc = new JavaSparkContext("local[4]", 
> "SparkStream",conf);
>        JavaStreamingContext jssc = new JavaStreamingContext(sc, new 
> Duration(3000));
> 
> 
>  /* connection to cassandra */
> /*    conf.set("spark.cassandra.connection.host", "127.0.0.1:9042");
>        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
>        Session session = connector.openSession();
>        session.execute("CREATE TABLE IF NOT EXISTS testkeyspace.test_table 
> (key INT PRIMARY KEY, value TEXT)");
> */
> 
>        /* Receive Kafka streaming inputs */
>        JavaPairReceiverInputDStream<String, String> messages = 
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
> 
> 
>        /* Create DStream */
>        JavaDStream<TestTable> data = messages.map(new 
> Function<Tuple2<String,String>, TestTable >()
>        {
>            public TestTable call(Tuple2<String, String> message)
>            {
>                return new TestTable(new Integer(++key), message._2() );
>            }
>        }
>        );
> 
> 
>        /* Write to cassandra */
>        javaFunctions(data).writerBuilder("testkeyspace", "test_table", 
> mapToRow(TestTable.class)).saveToCassandra();
>    //  data.print();
> 
> 
>        jssc.start();
>        jssc.awaitTermination();
> 
>    }
> }
> 
> class TestTable implements Serializable
> {
>    Integer key;
>    String value;
> 
>    public TestTable() {}
> 
>    public TestTable(Integer k, String v)
>    {
>        key=k;
>        value=v;
>    }
> 
>    public Integer getKey(){
>        return key;
>    }
> 
>    public void setKey(Integer k){
>        key=k;
>    }
> 
>    public String getValue(){
>        return value;
>    }
> 
>    public void setValue(String v){
>        value=v;
>    }
> 
>    public String toString(){
>        return MessageFormat.format("TestTable'{'key={0},
> value={1}'}'", key, value);
>    }
> }
> 
> The output log is:
> 
> [INFO] Compiling 1 source file to
> /root/Documents/SparkStreamSample/target/classes
> [INFO] 2 errors
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR :
> [INFO] -------------------------------------------------------------
> [ERROR] 
> /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,81]
> cannot find symbol
>  symbol:   method mapToRow(java.lang.Class<com.spark.TestTable>)
>  location: class com.spark.SparkStream
> [ERROR] 
> /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,17]
> no suitable method found for
> javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable>)
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<T>,java.lang.Class<T>)
> is not applicable
>      (cannot infer type-variable(s) T
>        (actual and formal argument lists differ in length))
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.dstream.DStream<T>,java.lang.Class<T>)
> is not applicable
>      (cannot infer type-variable(s) T
>        (actual and formal argument lists differ in length))
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.api.java.JavaRDD<T>,java.lang.Class<T>)
> is not applicable
>      (cannot infer type-variable(s) T
>        (actual and formal argument lists differ in length))
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.rdd.RDD<T>,java.lang.Class<T>)
> is not applicable
>      (cannot infer type-variable(s) T
>        (actual and formal argument lists differ in length))
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext)
> is not applicable
>      (argument mismatch;
> org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable>
> cannot be converted to
> org.apache.spark.streaming.api.java.JavaStreamingContext)
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext)
> is not applicable
>      (argument mismatch;
> org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable>
> cannot be converted to org.apache.spark.streaming.StreamingContext)
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.api.java.JavaSparkContext)
> is not applicable
>      (argument mismatch;
> org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable>
> cannot be converted to org.apache.spark.api.java.JavaSparkContext)
>    method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.SparkContext)
> is not applicable
>      (argument mismatch;
> org.apache.spark.streaming.api.java.JavaDStream<com.spark.TestTable>
> cannot be converted to org.apache.spark.SparkContext)
> 
> 
> And the POM dependencies are:
> 
>    <dependency>
>        <groupId>org.apache.spark</groupId>
>        <artifactId>spark-streaming-kafka_2.10</artifactId>
>        <version>1.1.0</version>
>    </dependency>
> 
>    <dependency>
>        <groupId>org.apache.spark</groupId>
>        <artifactId>spark-streaming_2.10</artifactId>
>        <version>1.1.0</version>
>    </dependency>
> 
> <dependency>
>    <groupId>com.datastax.spark</groupId>
>    <artifactId>spark-cassandra-connector_2.10</artifactId>
>    <version>1.1.0</version>
> </dependency>
> <dependency>
>    <groupId>com.datastax.spark</groupId>
>    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
>    <version>1.0.4</version>
> </dependency>
> <dependency>
>    <groupId>org.apache.spark</groupId>
>    <artifactId>spark-core_2.10</artifactId>
>    <version>1.1.1</version>
> </dependency>
> 
> 
>    <dependency>
>        <groupId>com.msiops.footing</groupId>
>        <artifactId>footing-tuple</artifactId>
>        <version>0.2</version>
>    </dependency>
> 
>    <dependency>
>        <groupId>com.datastax.cassandra</groupId>
>        <artifactId>cassandra-driver-core</artifactId>
>        <version>1.0.8</version>
>    </dependency>
> 
> 
> Thanks,
> Aiman
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy.
> ______________________________________________________________________________________
> 
> www.accenture.com
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

Reply via email to