Spark Streaming

2014-09-24 Thread Reddy Raja
Given this program.. I have the following queries..

val sparkConf = new SparkConf().setAppName("NetworkWordCount")

sparkConf.set("spark.master", "local[2]")

val ssc = new StreamingContext(sparkConf, Seconds(10))

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.
MEMORY_AND_DISK_SER)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()


Q1) How do I know which part of the program is executing every 10 sec..

   My requirements is that, I want to execute a method and insert data into
Cassandra every time a set of messages comes in

Q2) Is there a function I can pass, so that, it gets executed when the next
set of messages comes in.

Q3) If I have a method in-beween the following lines

  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

my_method(stread rdd)..

ssc.start()


   The method is not getting executed..


Can some one answer these questions?

--Reddy


Re: problem in using Spark-Cassandra connector

2014-09-11 Thread Reddy Raja
You will have to create create KeySpace and Table.
See the message,
Table not found: EmailKeySpace.Emails

Looks like you have not created the Emails table.


On Thu, Sep 11, 2014 at 6:04 PM, Karunya Padala <
karunya.pad...@infotech-enterprises.com> wrote:

>
>
> Hi,
>
>
>
> I am new to spark.  I  encountered an issue when trying to connect to
> Cassandra using Spark Cassandra connector. Can anyone help me. Following
> are the details.
>
>
>
> 1) Following Spark and Cassandra versions I am using on LUbuntu12.0.
>
> i)spark-1.0.2-bin-hadoop2
>
> ii) apache-cassandra-2.0.10
>
>
>
> 2) In the Cassandra, i created a key space, table and inserted some data.
>
>
>
> 3)Following libs are specified when starting the spark-shell.
>
> antworks@INHN1I-DW1804:$ spark-shell --jars
> /home/antworks/lib-spark-cassandra/apache-cassandra-clientutil-2.0.10.jar,/home/antworks/lib-spark-cassandra/apache-cassandra-thrift-2.0.10.jar,/home/antworks/lib-spark-cassandra/cassandra-driver-core-2.0.2.jar,/home/antworks/lib-spark-cassandra/guava-15.0.jar,/home/antworks/lib-spark-cassandra/joda-convert-1.6.jar,/home/antworks/lib-spark-cassandra/joda-time-2.3.jar,/home/antworks/lib-spark-cassandra/libthrift-0.9.1.jar,/home/antworks/lib-spark-cassandra/spark-cassandra-connector_2.10-1.0.0-rc3.jar
>
>
>
> 4) when running the stmt  val rdd = sc.cassandraTable("EmailKeySpace",
> "Emails")encountered following issue.
>
>
>
> My application connecting to Cassandra and immediately disconnecting and
> throwing java.io.IOException: Table not found: EmailKeySpace.Emails
>
> Here is the stack trace.
>
>
>
> scala> import com.datastax.spark.connector._
>
> import com.datastax.spark.connector._
>
>
>
> scala> val rdd = sc.cassandraTable("EmailKeySpace", "Emails")
>
> 14/09/11 23:06:51 WARN FrameCompressor: Cannot find LZ4 class, you should
> make sure the LZ4 library is in the classpath if you intend to use it. LZ4
> compression will not be available for the protocol.
>
> 14/09/11 23:06:51 INFO Cluster: New Cassandra host /172.23.1.68:9042 added
>
> 14/09/11 23:06:51 INFO CassandraConnector: Connected to Cassandra cluster:
> AWCluster
>
> 14/09/11 23:06:52 INFO CassandraConnector: Disconnected from Cassandra
> cluster: AWCluster
>
> java.io.IOException: Table not found: EmailKeySpace.Emails
>
> at
> com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:208)
>
> at
> com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:205)
>
> at
> com.datastax.spark.connector.rdd.CassandraRDD.(CassandraRDD.scala:212)
>
> at
> com.datastax.spark.connector.SparkContextFunctions.cassandraTable(SparkContextFunctions.scala:48)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:15)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:20)
>
> at $iwC$$iwC$$iwC$$iwC.(:22)
>
> at $iwC$$iwC$$iwC.(:24)
>
> at $iwC$$iwC.(:26)
>
> at $iwC.(:28)
>
> at (:30)
>
> at .(:34)
>
> at .()
>
> at .(:7)
>
> at .()
>
> at $print()
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
>
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
>
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
>
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
>
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
>
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
>
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
>
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
>
> at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
>
> at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
>
> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
>
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
>
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
>
> at org.apache.spark.repl.Main$.main(Main.scala:31)
>
> at org.apache.spark.repl.Main.main(Main.