Hey friends, We're trying to make some batched computations run against an OLAP DB closer to "realtime". One of our more complex computations is a trigger when event A occurs but not event B within a given time period. Our experience with Spark is limited, but since Spark 2.3.0 just introduced Stream-Stream Joins <https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html> and our data is already in Kafka, we thought we'd try it out.
That said, in our exploration, we've been running into an issue where Spark optimizes the Kafka *watermark* to be applied *after* the *filter* in our SQL query. This means the watermark won't move forward unless there's data within the filtered results and thus, the trigger for "event B" not won't occur until another "event B" is triggered, which can be problematic depending on how granular the filter is. See the quick isolated example I setup in *spark-shell* below. ``` scala> :paste // Entering paste mode (ctrl-D to finish) val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", *"<host>:<port>"*).option("subscribe", "*<topic>*").option("startingOffsets", "latest").load() import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("id", StringType), StructField("message", StructType(Seq( StructField("event", StringType), StructField("timestamp", TimestampType) ))) )) val parsed = kafka.select(from_json($"value".cast(StringType), schema) as 'data).select($"data.*", $"data.message.timestamp" as 'ts).withWatermark("ts", "10 seconds") // Exiting paste mode, now interpreting. scala> parsed.filter("message.event = 'Item Added'").as('a).join(parsed.filter("message.event = 'Item Purchased'") as 'b, expr("a.id = b.id AND a.ts < b.ts AND b.ts < a.ts + interval 5 seconds"), joinType="left").explain() == Physical Plan == StreamingSymmetricHashJoin [id#24], [id#37], LeftOuter, condition = [ leftOnly = null, rightOnly = null, both = ((ts#23-T10000ms < ts#39-T10000ms) && (ts#39-T10000ms < ts#23-T10000ms + interval 5 seconds)), full = ((ts#23-T10000ms < ts#39-T10000ms) && (ts#39-T10000ms < ts#23-T10000ms + interval 5 seconds)) ], state info [ checkpoint = <unknown>, runId = 52d0e4a5-150c-4136-8542-c2c5e4bb59c2, opId = 0, ver = 0, numPartitions = 4], 0, state cleanup [ left value predicate: (ts#23-T10000ms <= -5000000), right value predicate: (ts#39-T10000ms <= 0) ] :- Exchange hashpartitioning(id#24, 4) : +- EventTimeWatermark ts#23: timestamp, interval 10 seconds : +- Project [jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).id AS id#24, jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message AS message#25, jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message.timestamp AS ts#23] : +- Filter (jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message.event = Item Added) : +- StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] +- Exchange hashpartitioning(id#37, 4) +- *(1) Filter isnotnull(ts#39-T10000ms) +- EventTimeWatermark ts#39: timestamp, interval 10 seconds +- Project [jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).id AS id#37, jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message AS message#38, jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message.timestamp AS ts#39] +- Filter ((jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message.event = Item Purchased) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).id)) +- StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] ``` (The parts that need to be replaced with environment-specific values are *bolded *in case you'd like to reproduce locally.) As you can see in the plan above, Spark's optimizer has decided to make two separate *filtered *streams with two separate windows and then join them together. However, we'd like the plan to look more like = -> Kafka stream = => Watermark = = => JOIN between = = = = > Kafka-substream using filter A = = = = > Kafka substream using filter B or at least the following => JOIN between A and B = = > A) Kafka-substream = = = => Filter = = > B) Kafka substream using filter B = = = => Filter The behavior of filtering the stream *after* applying the watermark seems to happen with or without the JOIN as seen below. ``` val parsed = kafka.select(from_json($"value".cast(StringType), schema) as 'data).select($"data.*", $"data.message.timestamp" as 'ts).withWatermark("ts", "10 seconds") parsed.filter("message.event = 'Item Added'").explain() // Exiting paste mode, now interpreting. == Physical Plan == EventTimeWatermark ts#23: timestamp, interval 10 seconds +- Project [jsontostructs(StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message AS message#24, jsontostructs(StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message.timestamp AS ts#23] +- Filter (jsontostructs(StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true), cast(value#8 as string), Some(Etc/UTC), true).message.event = Item Added) +- StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] kafka: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] import org.apache.spark.sql.types._ schema: org.apache.spark.sql.types.StructType = StructType(StructField(message,StructType(StructField(event,StringType,true), StructField(timestamp,TimestampType,true)),true)) parsed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [message: struct<event: string, timestamp: timestamp>, ts: timestamp] ``` That said, I think the example with the JOIN helps put *why* it's important for the windowing to happen afterwards in picture. Anyways, is there another way to accurately express this query? Thanks in advance. -- Best regards, Tejas Manohar