Code with SQL broadcast hint. This worked and I was able to see that
broadcastjoin was performed.
val testDF = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
val lines = ssc.socketTextStream("DevNode", 9999)
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
val resultDF = recordDF.join(testDF, "Age")
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}
But for every batch this file was read and broadcast was performed.
Evaluating the entire DAG I guess.
16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27
16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27
Then I changed code to broadcast the dataframe. This didn't work either.
Not sure if this is what you meant by broadcasting a dataframe.
val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
)
val lines = ssc.socketTextStream("DevNode", 9999)
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
val resultDF = recordDF.join(testDF.value, "Age")
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}
On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <[email protected]>
wrote:
> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <[email protected]> wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql broadcast hint. Question is how to
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for each
>> RDD?
>> Right now every batch the metadata file is read and the DF is broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>>
>> Srikanth
>>
>>
>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <[email protected]>
>> wrote:
>>
>>> You should be able to broadcast that data frame using sc.broadcast and
>>> join against it.
>>>
>>> On Wed, 17 Feb 2016, 21:13 Srikanth <[email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>>> and cached on each executor.
>>>> Every micro batch in streaming will create a DF out of the RDD and join
>>>> the batch.
>>>> The below code will perform the broadcast operation for each RDD. Is
>>>> there a way to broadcast it just once?
>>>>
>>>> Alternate approachs are also welcome.
>>>>
>>>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>>
>>>> val metaDF =
>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>> .join(DF1, "id")
>>>> metaDF.cache
>>>>
>>>>
>>>> val lines = streamingcontext.textFileStream(path)
>>>>
>>>> lines.foreachRDD( rdd => {
>>>> val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>> val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>
>>>> joinedDF.rdd.foreachPartition ( partition => {
>>>> partition.foreach( row => {
>>>> ...
>>>> ...
>>>> })
>>>> })
>>>> })
>>>>
>>>> streamingcontext.start
>>>>
>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>> This way I can avoid shuffling metaDF each time.
>>>>
>>>> Let me know you thoughts.
>>>>
>>>> Thanks
>>>>
>>>>
>>