All,

I'm new to Spark and I'm having a hard time doing a simple join of two DFs

Intent:
- I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages (Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table.

Problem:
- Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown). This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever...

Could anyone shed some light on this? In my perception this should be a prime-example for DFs and Spark Streaming.

Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2

Code:

def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")
      .set("spark.cassandra.connection.host", "xxx")
      .set("spark.cassandra.connection.keep_alive_ms", "30000")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("INFO")

    // Initialise Kafka
    val kafkaTopics = Set[String]("xxx")
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
      "auto.offset.reset" -> "smallest")

    // Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)

    // Executed on the driver
    messages.foreachRDD { rdd =>

      // Create an instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Map MyMsg RDD
      val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}

      // Convert RDD[MyMsg] to DataFrame
      val MyMsgDf = MyMsgRdd.toDF()
        .select(
            $"prim1Id" as 'prim1_id,
            $"prim2Id" as 'prim2_id,
            $...
      )

      // Load DataFrame from C* data-source
      val base_data = base_data_df.getInstance(sqlContext)

      // Inner join on prim1Id and prim2Id
      val joinedDf = MyMsgDf.join(base_data,
            MyMsgDf("prim1_id") === base_data("prim1_id") &&
            MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
            .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
                && base_data("prim2_id").isin(MyMsgDf("prim2_id")))

      joinedDf.show()
      joinedDf.printSchema()

      // Select relevant fields

      // Persist

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
}

SO: http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to