Re: [VOTE] SPIP: Structured Streaming - Arbitrary State API v2

2024-01-10 Thread Bartosz Konieczny
+1 :)

On Wed, Jan 10, 2024 at 9:57 AM Shixiong Zhu  wrote:

> +1 (binding)
>
> Best Regards,
> Shixiong Zhu
>
>
> On Tue, Jan 9, 2024 at 6:47 PM 刘唯  wrote:
>
>> This is a good addition! +1
>>
>> Raghu Angadi  于2024年1月9日周二 13:17写道:
>>
>>> +1. This is a major improvement to the state API.
>>>
>>> Raghu.
>>>
>>> On Tue, Jan 9, 2024 at 1:42 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> +1 for me as well
>>>>
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 9 Jan 2024 at 03:24, Anish Shrigondekar
>>>>  wrote:
>>>>
>>>>> Thanks Jungtaek for creating the Vote thread.
>>>>>
>>>>> +1 (non-binding) from my side too.
>>>>>
>>>>> Thanks,
>>>>> Anish
>>>>>
>>>>> On Tue, Jan 9, 2024 at 6:09 AM Jungtaek Lim <
>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Starting with my +1 (non-binding). Thanks!
>>>>>>
>>>>>> On Tue, Jan 9, 2024 at 9:37 AM Jungtaek Lim <
>>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'd like to start the vote for SPIP: Structured Streaming -
>>>>>>> Arbitrary State API v2.
>>>>>>>
>>>>>>> References:
>>>>>>>
>>>>>>>- JIRA ticket <https://issues.apache.org/jira/browse/SPARK-45939>
>>>>>>>- SPIP doc
>>>>>>>
>>>>>>> <https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig/edit?usp=sharing>
>>>>>>>- Discussion thread
>>>>>>><https://lists.apache.org/thread/3jyjdgk1m5zyqfmrocnt6t415703nc8l>
>>>>>>>
>>>>>>> Please vote on the SPIP for the next 72 hours:
>>>>>>>
>>>>>>> [ ] +1: Accept the proposal as an official SPIP
>>>>>>> [ ] +0
>>>>>>> [ ] -1: I don’t think this is a good idea because …
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>

-- 
Bartosz Konieczny
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode


Re: [VOTE] SPIP: State Data Source - Reader

2023-10-24 Thread Bartosz Konieczny
+1

On Tuesday, October 24, 2023, Jia Fan  wrote:

> +1
>
> L. C. Hsieh  于2023年10月24日周二 13:23写道:
>
>> +1
>>
>> On Mon, Oct 23, 2023 at 6:31 PM Anish Shrigondekar
>>  wrote:
>> >
>> > +1 (non-binding)
>> >
>> > Thanks,
>> > Anish
>> >
>> > On Mon, Oct 23, 2023 at 5:01 PM Wenchen Fan 
>> wrote:
>> >>
>> >> +1
>> >>
>> >> On Mon, Oct 23, 2023 at 4:03 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>> >>>
>> >>> Starting with my +1 (non-binding). Thanks!
>> >>>
>> >>> On Mon, Oct 23, 2023 at 1:23 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>> >>>>
>> >>>> Hi all,
>> >>>>
>> >>>> I'd like to start the vote for SPIP: State Data Source - Reader.
>> >>>>
>> >>>> The high level summary of the SPIP is that we propose a new data
>> source which enables a read ability for state store in the checkpoint, via
>> batch query. This would enable two major use cases 1) constructing tests
>> with verifying state store 2) inspecting values in state store in the
>> scenario of incident.
>> >>>>
>> >>>> References:
>> >>>>
>> >>>> JIRA ticket
>> >>>> SPIP doc
>> >>>> Discussion thread
>> >>>>
>> >>>> Please vote on the SPIP for the next 72 hours:
>> >>>>
>> >>>> [ ] +1: Accept the proposal as an official SPIP
>> >>>> [ ] +0
>> >>>> [ ] -1: I don’t think this is a good idea because …
>> >>>>
>> >>>> Thanks!
>> >>>> Jungtaek Lim (HeartSaVioR)
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

-- 
Bartosz Konieczny
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode


Re: [DISCUSS] SPIP: State Data Source - Reader

2023-10-16 Thread Bartosz Konieczny
Thank you, Jungtaek, for your answers! It's clear now.

+1 for me. It seems like a prerequisite for further ops-related
improvements for the state store management. I mean especially here the
state rebalancing that could rely on this read+write state store API. I
don't mean here the dynamic state rebalancing that could probably be
implemented with a lower latency directly in the stateful API. Instead I'm
thinking more of an offline job to rebalance the state and later restart
the stateful pipeline with the changed number of shuffle partitions.

Best,
Bartosz.

On Mon, Oct 16, 2023 at 6:19 PM Jungtaek Lim 
wrote:

> bump for better reach
>
> On Thu, Oct 12, 2023 at 4:26 PM Jungtaek Lim 
> wrote:
>
>> Sorry, please use this link instead for SPIP doc:
>> https://docs.google.com/document/d/1_iVf_CIu2RZd3yWWF6KoRNlBiz5NbSIK0yThqG0EvPY/edit?usp=sharing
>>
>>
>> On Thu, Oct 12, 2023 at 3:58 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Hi dev,
>>>
>>> I'd like to start a discussion on "State Data Source - Reader".
>>>
>>> This proposal aims to introduce a new data source "statestore" which
>>> enables reading the state rows from existing checkpoint via offline (batch)
>>> query. This will enable users to 1) create unit tests against stateful
>>> query verifying the state value (especially flatMapGroupsWithState), 2)
>>> gather more context on the status when an incident occurs, especially for
>>> incorrect output.
>>>
>>> *SPIP*:
>>> https://docs.google.com/document/d/1HjEupRv8TRFeULtJuxRq_tEG1Wq-9UNu-ctGgCYRke0/edit?usp=sharing
>>> *JIRA*: https://issues.apache.org/jira/browse/SPARK-45511
>>>
>>> Looking forward to your feedback!
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> ps. The scope of the project is narrowed to the reader in this SPIP,
>>> since the writer requires us to consider more cases. We are planning on it.
>>>
>>

-- 
Bartosz Konieczny
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode


Re: Watermark on late data only

2023-10-10 Thread Bartosz Konieczny
Thank you for the clarification, Jungtaek  Indeed, it doesn't sound like
a highly demanded feature from the end users, haven't seen that a lot on
StackOverflow or mailing lists. I was just curious about the reasons.

Using the arbitrary stateful processing could be indeed a workaround! But
IMHO it would be easier to expose this watermark value from a function like
a current_watermark() and let the users do anything with the data. And it
wouldn't require having the state store overhead to deal with. The function
could simplify implementing the *side output pattern* where we could
process the on-time data differently from the late data, e.g. write late
data to a dedicated space in the lake and facilitate the backfilling for
the batch pipelines?

With the current_watermark function it could be expressed as a simple:

streamDataset.foreachBatch((dataframe, batchVersion) =>  {
  dataframe.cache()
  dataframe.filter(current_watermark() >
event_time_from_datafarame).writeTo("late_data")
  dataframe.filter(current_watermark() <=
event_time_from_datafarame).writeTo("on_time_data")
})

A little bit as you can do with Apache Flink in fact:
https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81

WDYT?

Best,
Bartosz.

PS. Will be happy to contribute on that if the feature does make sense ;)

On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim 
wrote:

> Technically speaking, "late data" represents the data which cannot be
> processed due to the fact the engine threw out the state associated with
> the data already.
>
> That said, the only reason watermark does exist for streaming is to handle
> stateful operators. From the engine's point of view, there is no concept
> about "late data" for stateless query. It's something users have to
> leverage "filter" by themselves, without relying on the value of watermark.
> I guess someone may see some benefit of automatic tracking of trend for
> event time and want to define late data based on the watermark even in
> stateless query, but personally I don't hear about the request so far.
>
> As a workaround you can leverage flatMapGroupsWithState which provides the
> value of watermark for you, but I'd agree it's too heavyweight just to do
> this. If we see consistent demand on it, we could probably look into it and
> maybe introduce a new SQL function (which works only on streaming - that's
> probably a major blocker on introduction) on it.
>
> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny 
> wrote:
>
>> Hi,
>>
>> I've been analyzing the watermark propagation added in the 3.5.0 recently
>> and had to return to the basics of watermarks. One question is still
>> unanswered in my head.
>>
>> Why are the watermarks reserved to stateful queries? Can't they apply to
>> the filtering late date out only?
>>
>> The reason is only historical, as the initial design doc
>> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
>> mentions the aggregated queries exclusively? Or are there any technical
>> limitations why writing the jobs like below don't drop late data
>> automatically?
>>
>> import sparkSession.implicits._
>> implicit val sparkContext = sparkSession.sqlContext
>> val clicksStream = MemoryStream[Click]
>> val clicksWithWatermark = clicksStream.toDF
>>   .withWatermark("clickTime", "10 minutes")
>> val query =
>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>   .start()
>>
>> clicksStream.addData(Seq(
>>   Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>   Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>   Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>> ))
>>
>>
>> query.processAllAvailable()
>>
>> clicksStream.addData(Seq(
>>   Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>   Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>   Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>   Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>> ))
>> query.processAllAvailable()
>>
>> One quick implementation could be adding a new physical plan rule to the
>> IncrementalExecution
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
>> for the EventTimeWatermark node. That's a first thought, maybe too
>> simplistic and hiding some pitfalls?
>>
>> Best,
>> Bartosz.
>> --
>> freelance data engineer
>> https://www.waitingforcode.com
>> https://github.com/bartosz25/
>> https://twitter.com/waitingforcode
>>
>>

-- 
Bartosz Konieczny
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode


Watermark on late data only

2023-10-08 Thread Bartosz Konieczny
Hi,

I've been analyzing the watermark propagation added in the 3.5.0 recently
and had to return to the basics of watermarks. One question is still
unanswered in my head.

Why are the watermarks reserved to stateful queries? Can't they apply to
the filtering late date out only?

The reason is only historical, as the initial design doc

mentions the aggregated queries exclusively? Or are there any technical
limitations why writing the jobs like below don't drop late data
automatically?

import sparkSession.implicits._
implicit val sparkContext = sparkSession.sqlContext
val clicksStream = MemoryStream[Click]
val clicksWithWatermark = clicksStream.toDF
  .withWatermark("clickTime", "10 minutes")
val query =
clicksWithWatermark.writeStream.format("console").option("truncate", false)
  .start()

clicksStream.addData(Seq(
  Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
  Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
  Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
))


query.processAllAvailable()

clicksStream.addData(Seq(
  Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
  Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
  Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
  Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
))
query.processAllAvailable()

One quick implementation could be adding a new physical plan rule to the
IncrementalExecution

for the EventTimeWatermark node. That's a first thought, maybe too
simplistic and hiding some pitfalls?

Best,
Bartosz.
-- 
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode