Re: Watermark on late data only

2023-10-10 Thread Raghu Angadi
I like some way to expose watermarks to the user. It does affect
the processing of the records, so it is relevant for the users.
`current_watermark()` is a good option.
The implementation of this might be engine specific. But it is a very
relevant concept for authors of streaming pipelines.
Ideally I would like the engine to drop (or write to side output) even for
stateless pipelines for consistency.

On Tue, Oct 10, 2023 at 2:27 AM Bartosz Konieczny 
wrote:

> Thank you for the clarification, Jungtaek  Indeed, it doesn't sound like
> a highly demanded feature from the end users, haven't seen that a lot on
> StackOverflow or mailing lists. I was just curious about the reasons.
>
> Using the arbitrary stateful processing could be indeed a workaround! But
> IMHO it would be easier to expose this watermark value from a function like
> a current_watermark() and let the users do anything with the data. And it
> wouldn't require having the state store overhead to deal with. The function
> could simplify implementing the *side output pattern* where we could
> process the on-time data differently from the late data, e.g. write late
> data to a dedicated space in the lake and facilitate the backfilling for
> the batch pipelines?
>
> With the current_watermark function it could be expressed as a simple:
>
> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>   dataframe.cache()
>   dataframe.filter(current_watermark() >
> event_time_from_datafarame).writeTo("late_data")
>   dataframe.filter(current_watermark() <=
> event_time_from_datafarame).writeTo("on_time_data")
> })
>
> A little bit as you can do with Apache Flink in fact:
>
> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>
> WDYT?
>
> Best,
> Bartosz.
>
> PS. Will be happy to contribute on that if the feature does make sense ;)
>
> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim 
> wrote:
>
>> Technically speaking, "late data" represents the data which cannot be
>> processed due to the fact the engine threw out the state associated with
>> the data already.
>>
>> That said, the only reason watermark does exist for streaming is to
>> handle stateful operators. From the engine's point of view, there is no
>> concept about "late data" for stateless query. It's something users have to
>> leverage "filter" by themselves, without relying on the value of watermark.
>> I guess someone may see some benefit of automatic tracking of trend for
>> event time and want to define late data based on the watermark even in
>> stateless query, but personally I don't hear about the request so far.
>>
>> As a workaround you can leverage flatMapGroupsWithState which provides
>> the value of watermark for you, but I'd agree it's too heavyweight just to
>> do this. If we see consistent demand on it, we could probably look into it
>> and maybe introduce a new SQL function (which works only on streaming -
>> that's probably a major blocker on introduction) on it.
>>
>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>> bartkoniec...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've been analyzing the watermark propagation added in the 3.5.0
>>> recently and had to return to the basics of watermarks. One question is
>>> still unanswered in my head.
>>>
>>> Why are the watermarks reserved to stateful queries? Can't they apply to
>>> the filtering late date out only?
>>>
>>> The reason is only historical, as the initial design doc
>>> 
>>> mentions the aggregated queries exclusively? Or are there any technical
>>> limitations why writing the jobs like below don't drop late data
>>> automatically?
>>>
>>> import sparkSession.implicits._
>>> implicit val sparkContext = sparkSession.sqlContext
>>> val clicksStream = MemoryStream[Click]
>>> val clicksWithWatermark = clicksStream.toDF
>>>   .withWatermark("clickTime", "10 minutes")
>>> val query =
>>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>>   .start()
>>>
>>> clicksStream.addData(Seq(
>>>   Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>>   Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>>   Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>>> ))
>>>
>>>
>>> query.processAllAvailable()
>>>
>>> clicksStream.addData(Seq(
>>>   Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>>   Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>>   Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>>   Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>>> ))
>>> query.processAllAvailable()
>>>
>>> One quick implementation could be adding a new physical plan rule to the
>>> IncrementalExecution
>>> 
>>> for the 

Re: Watermark on late data only

2023-10-10 Thread Jungtaek Lim
slight correction/clarification: We now take the "previous" watermark to
determine the late record, because they are valid inputs for non-first
stateful operators dropping records based on the same criteria would drop
valid records from previous (upstream) stateful operators. Please look back
which criteria we use for evicting states, which could become outputs of
the operator.

On Tue, Oct 10, 2023 at 8:10 PM Jungtaek Lim 
wrote:

> We wouldn't like to expose the internal mechanism to the public.
>
> As you are a very detail oriented engineer tracking major changes, you
> might notice that we "changed" the definition of late record while fixing
> late records. Previously the late record is defined as a record having
> event time timestamp be earlier than the "current" watermark. How has it
> changed? We now take the "previous" watermark to determine the late record,
> because they are valid inputs for non-first stateful operators. If we were
> exposing the function current_watermark() which provides current watermark
> and users somehow build a side-output based on this, it would be broken
> when we introduce the fix on late record filtering. Or even worse, we may
> decide not to fix the issue worrying too much about existing workloads, and
> give up multiple stateful operators.
>
> The change is arguably not a breaking change, because we never guarantee
> that we won't process the data which is earlier than the watermark. The
> guarantee is one way, we guarantee that the record is processed if the
> event time of the record is later than the watermark. The opposite way is
> not guaranteed, and we actually documented this in the guide doc.
>
> So the workaround I mentioned cannot be used for capturing dropped late
> records - that does not work as expected. We will need to apply exactly the
> same criteria (probably the same predicate) on capturing them. We are aware
> of the demand for side-output of dropped late records, and I also agree
> that just having numbers of dropped records is never ideal.
>
> Let's see whether we have an opportunity to prioritize this. If you have
> an idea (sketched design) for implementing this, that should be awesome!
>
> On Tue, Oct 10, 2023 at 6:27 PM Bartosz Konieczny 
> wrote:
>
>> Thank you for the clarification, Jungtaek  Indeed, it doesn't sound
>> like a highly demanded feature from the end users, haven't seen that a lot
>> on StackOverflow or mailing lists. I was just curious about the reasons.
>>
>> Using the arbitrary stateful processing could be indeed a workaround! But
>> IMHO it would be easier to expose this watermark value from a function like
>> a current_watermark() and let the users do anything with the data. And
>> it wouldn't require having the state store overhead to deal with. The
>> function could simplify implementing the *side output pattern* where we
>> could process the on-time data differently from the late data, e.g. write
>> late data to a dedicated space in the lake and facilitate the backfilling
>> for the batch pipelines?
>>
>> With the current_watermark function it could be expressed as a simple:
>>
>> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>>   dataframe.cache()
>>   dataframe.filter(current_watermark() >
>> event_time_from_datafarame).writeTo("late_data")
>>   dataframe.filter(current_watermark() <=
>> event_time_from_datafarame).writeTo("on_time_data")
>> })
>>
>> A little bit as you can do with Apache Flink in fact:
>>
>> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>>
>> WDYT?
>>
>> Best,
>> Bartosz.
>>
>> PS. Will be happy to contribute on that if the feature does make sense ;)
>>
>> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Technically speaking, "late data" represents the data which cannot be
>>> processed due to the fact the engine threw out the state associated with
>>> the data already.
>>>
>>> That said, the only reason watermark does exist for streaming is to
>>> handle stateful operators. From the engine's point of view, there is no
>>> concept about "late data" for stateless query. It's something users have to
>>> leverage "filter" by themselves, without relying on the value of watermark.
>>> I guess someone may see some benefit of automatic tracking of trend for
>>> event time and want to define late data based on the watermark even in
>>> stateless query, but personally I don't hear about the request so far.
>>>
>>> As a workaround you can leverage flatMapGroupsWithState which provides
>>> the value of watermark for you, but I'd agree it's too heavyweight just to
>>> do this. If we see consistent demand on it, we could probably look into it
>>> and maybe introduce a new SQL function (which works only on streaming -
>>> that's probably a major blocker on introduction) on it.
>>>
>>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>>> bartkoniec...@gmail.com> wrote:

Re: Watermark on late data only

2023-10-10 Thread Jungtaek Lim
We wouldn't like to expose the internal mechanism to the public.

As you are a very detail oriented engineer tracking major changes, you
might notice that we "changed" the definition of late record while fixing
late records. Previously the late record is defined as a record having
event time timestamp be earlier than the "current" watermark. How has it
changed? We now take the "previous" watermark to determine the late record,
because they are valid inputs for non-first stateful operators. If we were
exposing the function current_watermark() which provides current watermark
and users somehow build a side-output based on this, it would be broken
when we introduce the fix on late record filtering. Or even worse, we may
decide not to fix the issue worrying too much about existing workloads, and
give up multiple stateful operators.

The change is arguably not a breaking change, because we never guarantee
that we won't process the data which is earlier than the watermark. The
guarantee is one way, we guarantee that the record is processed if the
event time of the record is later than the watermark. The opposite way is
not guaranteed, and we actually documented this in the guide doc.

So the workaround I mentioned cannot be used for capturing dropped late
records - that does not work as expected. We will need to apply exactly the
same criteria (probably the same predicate) on capturing them. We are aware
of the demand for side-output of dropped late records, and I also agree
that just having numbers of dropped records is never ideal.

Let's see whether we have an opportunity to prioritize this. If you have an
idea (sketched design) for implementing this, that should be awesome!

On Tue, Oct 10, 2023 at 6:27 PM Bartosz Konieczny 
wrote:

> Thank you for the clarification, Jungtaek  Indeed, it doesn't sound like
> a highly demanded feature from the end users, haven't seen that a lot on
> StackOverflow or mailing lists. I was just curious about the reasons.
>
> Using the arbitrary stateful processing could be indeed a workaround! But
> IMHO it would be easier to expose this watermark value from a function like
> a current_watermark() and let the users do anything with the data. And it
> wouldn't require having the state store overhead to deal with. The function
> could simplify implementing the *side output pattern* where we could
> process the on-time data differently from the late data, e.g. write late
> data to a dedicated space in the lake and facilitate the backfilling for
> the batch pipelines?
>
> With the current_watermark function it could be expressed as a simple:
>
> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>   dataframe.cache()
>   dataframe.filter(current_watermark() >
> event_time_from_datafarame).writeTo("late_data")
>   dataframe.filter(current_watermark() <=
> event_time_from_datafarame).writeTo("on_time_data")
> })
>
> A little bit as you can do with Apache Flink in fact:
>
> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>
> WDYT?
>
> Best,
> Bartosz.
>
> PS. Will be happy to contribute on that if the feature does make sense ;)
>
> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim 
> wrote:
>
>> Technically speaking, "late data" represents the data which cannot be
>> processed due to the fact the engine threw out the state associated with
>> the data already.
>>
>> That said, the only reason watermark does exist for streaming is to
>> handle stateful operators. From the engine's point of view, there is no
>> concept about "late data" for stateless query. It's something users have to
>> leverage "filter" by themselves, without relying on the value of watermark.
>> I guess someone may see some benefit of automatic tracking of trend for
>> event time and want to define late data based on the watermark even in
>> stateless query, but personally I don't hear about the request so far.
>>
>> As a workaround you can leverage flatMapGroupsWithState which provides
>> the value of watermark for you, but I'd agree it's too heavyweight just to
>> do this. If we see consistent demand on it, we could probably look into it
>> and maybe introduce a new SQL function (which works only on streaming -
>> that's probably a major blocker on introduction) on it.
>>
>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>> bartkoniec...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've been analyzing the watermark propagation added in the 3.5.0
>>> recently and had to return to the basics of watermarks. One question is
>>> still unanswered in my head.
>>>
>>> Why are the watermarks reserved to stateful queries? Can't they apply to
>>> the filtering late date out only?
>>>
>>> The reason is only historical, as the initial design doc
>>> 
>>> mentions the aggregated queries exclusively? Or are there any technical
>>> limitations why writing the jobs 

Re: Watermark on late data only

2023-10-10 Thread Bartosz Konieczny
Thank you for the clarification, Jungtaek  Indeed, it doesn't sound like
a highly demanded feature from the end users, haven't seen that a lot on
StackOverflow or mailing lists. I was just curious about the reasons.

Using the arbitrary stateful processing could be indeed a workaround! But
IMHO it would be easier to expose this watermark value from a function like
a current_watermark() and let the users do anything with the data. And it
wouldn't require having the state store overhead to deal with. The function
could simplify implementing the *side output pattern* where we could
process the on-time data differently from the late data, e.g. write late
data to a dedicated space in the lake and facilitate the backfilling for
the batch pipelines?

With the current_watermark function it could be expressed as a simple:

streamDataset.foreachBatch((dataframe, batchVersion) =>  {
  dataframe.cache()
  dataframe.filter(current_watermark() >
event_time_from_datafarame).writeTo("late_data")
  dataframe.filter(current_watermark() <=
event_time_from_datafarame).writeTo("on_time_data")
})

A little bit as you can do with Apache Flink in fact:
https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81

WDYT?

Best,
Bartosz.

PS. Will be happy to contribute on that if the feature does make sense ;)

On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim 
wrote:

> Technically speaking, "late data" represents the data which cannot be
> processed due to the fact the engine threw out the state associated with
> the data already.
>
> That said, the only reason watermark does exist for streaming is to handle
> stateful operators. From the engine's point of view, there is no concept
> about "late data" for stateless query. It's something users have to
> leverage "filter" by themselves, without relying on the value of watermark.
> I guess someone may see some benefit of automatic tracking of trend for
> event time and want to define late data based on the watermark even in
> stateless query, but personally I don't hear about the request so far.
>
> As a workaround you can leverage flatMapGroupsWithState which provides the
> value of watermark for you, but I'd agree it's too heavyweight just to do
> this. If we see consistent demand on it, we could probably look into it and
> maybe introduce a new SQL function (which works only on streaming - that's
> probably a major blocker on introduction) on it.
>
> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny 
> wrote:
>
>> Hi,
>>
>> I've been analyzing the watermark propagation added in the 3.5.0 recently
>> and had to return to the basics of watermarks. One question is still
>> unanswered in my head.
>>
>> Why are the watermarks reserved to stateful queries? Can't they apply to
>> the filtering late date out only?
>>
>> The reason is only historical, as the initial design doc
>> 
>> mentions the aggregated queries exclusively? Or are there any technical
>> limitations why writing the jobs like below don't drop late data
>> automatically?
>>
>> import sparkSession.implicits._
>> implicit val sparkContext = sparkSession.sqlContext
>> val clicksStream = MemoryStream[Click]
>> val clicksWithWatermark = clicksStream.toDF
>>   .withWatermark("clickTime", "10 minutes")
>> val query =
>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>   .start()
>>
>> clicksStream.addData(Seq(
>>   Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>   Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>   Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>> ))
>>
>>
>> query.processAllAvailable()
>>
>> clicksStream.addData(Seq(
>>   Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>   Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>   Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>   Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>> ))
>> query.processAllAvailable()
>>
>> One quick implementation could be adding a new physical plan rule to the
>> IncrementalExecution
>> 
>> for the EventTimeWatermark node. That's a first thought, maybe too
>> simplistic and hiding some pitfalls?
>>
>> Best,
>> Bartosz.
>> --
>> freelance data engineer
>> https://www.waitingforcode.com
>> https://github.com/bartosz25/
>> https://twitter.com/waitingforcode
>>
>>

-- 
Bartosz Konieczny
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode


Re: Watermark on late data only

2023-10-09 Thread Jungtaek Lim
Technically speaking, "late data" represents the data which cannot be
processed due to the fact the engine threw out the state associated with
the data already.

That said, the only reason watermark does exist for streaming is to handle
stateful operators. From the engine's point of view, there is no concept
about "late data" for stateless query. It's something users have to
leverage "filter" by themselves, without relying on the value of watermark.
I guess someone may see some benefit of automatic tracking of trend for
event time and want to define late data based on the watermark even in
stateless query, but personally I don't hear about the request so far.

As a workaround you can leverage flatMapGroupsWithState which provides the
value of watermark for you, but I'd agree it's too heavyweight just to do
this. If we see consistent demand on it, we could probably look into it and
maybe introduce a new SQL function (which works only on streaming - that's
probably a major blocker on introduction) on it.

On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny 
wrote:

> Hi,
>
> I've been analyzing the watermark propagation added in the 3.5.0 recently
> and had to return to the basics of watermarks. One question is still
> unanswered in my head.
>
> Why are the watermarks reserved to stateful queries? Can't they apply to
> the filtering late date out only?
>
> The reason is only historical, as the initial design doc
> 
> mentions the aggregated queries exclusively? Or are there any technical
> limitations why writing the jobs like below don't drop late data
> automatically?
>
> import sparkSession.implicits._
> implicit val sparkContext = sparkSession.sqlContext
> val clicksStream = MemoryStream[Click]
> val clicksWithWatermark = clicksStream.toDF
>   .withWatermark("clickTime", "10 minutes")
> val query =
> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>   .start()
>
> clicksStream.addData(Seq(
>   Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>   Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>   Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
> ))
>
>
> query.processAllAvailable()
>
> clicksStream.addData(Seq(
>   Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>   Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>   Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>   Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
> ))
> query.processAllAvailable()
>
> One quick implementation could be adding a new physical plan rule to the
> IncrementalExecution
> 
> for the EventTimeWatermark node. That's a first thought, maybe too
> simplistic and hiding some pitfalls?
>
> Best,
> Bartosz.
> --
> freelance data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>


Watermark on late data only

2023-10-08 Thread Bartosz Konieczny
Hi,

I've been analyzing the watermark propagation added in the 3.5.0 recently
and had to return to the basics of watermarks. One question is still
unanswered in my head.

Why are the watermarks reserved to stateful queries? Can't they apply to
the filtering late date out only?

The reason is only historical, as the initial design doc

mentions the aggregated queries exclusively? Or are there any technical
limitations why writing the jobs like below don't drop late data
automatically?

import sparkSession.implicits._
implicit val sparkContext = sparkSession.sqlContext
val clicksStream = MemoryStream[Click]
val clicksWithWatermark = clicksStream.toDF
  .withWatermark("clickTime", "10 minutes")
val query =
clicksWithWatermark.writeStream.format("console").option("truncate", false)
  .start()

clicksStream.addData(Seq(
  Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
  Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
  Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
))


query.processAllAvailable()

clicksStream.addData(Seq(
  Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
  Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
  Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
  Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
))
query.processAllAvailable()

One quick implementation could be adding a new physical plan rule to the
IncrementalExecution

for the EventTimeWatermark node. That's a first thought, maybe too
simplistic and hiding some pitfalls?

Best,
Bartosz.
-- 
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode