Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to Cassandra" 
sections on this page:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-----Original Message-----
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
     if (instance == null) {
       // Load DataFrame with C* data-source
       instance = sqlContext.read
         .format("org.apache.spark.sql.cassandra")
         .options(Map("table" -> "cf", "keyspace" -> "ks"))
         .load()
     }
     instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

> You may have better luck with this question on the Spark Cassandra 
> Connector mailing list.
>
>
>
> One quick question about this code from your email:
>
>        // Load DataFrame from C* data-source
>
>        val base_data = base_data_df.getInstance(sqlContext)
>
>
>
> What exactly is base_data_df and how are you creating it?
>
> Mohammed
> Author: Big Data Analytics with
> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
> 1484209656/>
>
>
>
> -----Original Message-----
> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
> Sent: Tuesday, February 9, 2016 6:58 AM
> To: user@spark.apache.org
> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
>
>
> All,
>
>
>
> I'm new to Spark and I'm having a hard time doing a simple join of two 
> DFs
>
>
>
> Intent:
>
> -  I'm receiving data from Kafka via direct stream and would like to 
> enrich the messages with data from Cassandra. The Kafka messages
>
> (Protobufs) are decoded into DataFrames and then joined with a 
> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
> streaming batch size to raw C* data is [several streaming messages to 
> millions of C* rows], BUT the join always yields exactly ONE result 
> [1:1] per message. After the join the resulting DF is eventually 
> stored to another C* table.
>
>
>
> Problem:
>
> - Even though I'm joining the two DFs on the full Cassandra primary 
> key and pushing the corresponding filter to C*, it seems that Spark is 
> loading the whole C* data-set into memory before actually joining 
> (which I'd like to prevent by using the filter/predicate pushdown).
>
> This leads to a lot of shuffling and tasks being spawned, hence the 
> "simple" join takes forever...
>
>
>
> Could anyone shed some light on this? In my perception this should be 
> a prime-example for DFs and Spark Streaming.
>
>
>
> Environment:
>
> - Spark 1.6
>
> - Cassandra 2.1.12
>
> - Cassandra-Spark-Connector 1.5-RC1
>
> - Kafka 0.8.2.2
>
>
>
> Code:
>
>
>
> def main(args: Array[String]) {
>
>      val conf = new SparkConf()
>
>        .setAppName("test")
>
>        .set("spark.cassandra.connection.host", "xxx")
>
>        .set("spark.cassandra.connection.keep_alive_ms", "30000")
>
>        .setMaster("local[*]")
>
>
>
>      val ssc = new StreamingContext(conf, Seconds(10))
>
>      ssc.sparkContext.setLogLevel("INFO")
>
>
>
>      // Initialise Kafka
>
>      val kafkaTopics = Set[String]("xxx")
>
>      val kafkaParams = Map[String, String](
>
>        "metadata.broker.list" -> 
> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>
>        "auto.offset.reset" -> "smallest")
>
>
>
>      // Kafka stream
>
>      val messages = KafkaUtils.createDirectStream[String, MyMsg, 
> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>
>
>
>      // Executed on the driver
>
>      messages.foreachRDD { rdd =>
>
>
>
>        // Create an instance of SQLContext
>
>        val sqlContext = 
> SQLContextSingleton.getInstance(rdd.sparkContext)
>
>        import sqlContext.implicits._
>
>
>
>        // Map MyMsg RDD
>
>        val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>
>
>
>        // Convert RDD[MyMsg] to DataFrame
>
>        val MyMsgDf = MyMsgRdd.toDF()
>
>         .select(
>
>              $"prim1Id" as 'prim1_id,
>
>              $"prim2Id" as 'prim2_id,
>
>              $...
>
>        )
>
>
>
>        // Load DataFrame from C* data-source
>
>        val base_data = base_data_df.getInstance(sqlContext)
>
>
>
>        // Inner join on prim1Id and prim2Id
>
>        val joinedDf = MyMsgDf.join(base_data,
>
>              MyMsgDf("prim1_id") === base_data("prim1_id") &&
>
>              MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
>
>              .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
>
>                  && base_data("prim2_id").isin(MyMsgDf("prim2_id")))
>
>
>
>        joinedDf.show()
>
>        joinedDf.printSchema()
>
>
>
>        // Select relevant fields
>
>
>
>        // Persist
>
>
>
>      }
>
>
>
>      // Start the computation
>
>      ssc.start()
>
>      ssc.awaitTermination()
>
> }
>
>
>
> SO:
>
> http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandr
> a-dataframes-in-spark-streaming-ignores-c-predicate-p
>
>
>
>
>
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail:  
> user-unsubscr...@spark.apache.org<mailto:user-unsubscribe@spark.apache
> .org>
> For additional commands, e-mail:  
> user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to