Re: Processing time watermarks in KinesisIO

2023-10-27 Thread Alexey Romanenko
Ahh, ok, I see.

Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
time” watermark policy, which we can remove later, and create a new fixed one.

PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.

—
Alexey

> On 27 Oct 2023, at 17:42, Jan Lukavský  wrote:
> 
> No, I'm referring to this [1] policy which has unexpected (and hardly 
> avoidable on the user-code side) data loss issues. The problem is that 
> assigning timestamps to elements and watermarks is completely decoupled and 
> unrelated, which I'd say is a bug.
> 
>  Jan
> 
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--
> 
> On 10/27/23 16:51, Alexey Romanenko wrote:
>> Why not just to create a custom watermark policy for that? Or you mean to 
>> make it as a default policy?
>> 
>> —
>> Alexey
>> 
>>> On 27 Oct 2023, at 10:25, Jan Lukavský  
>>>  wrote:
>>> 
>>> 
>>> Hi, 
>>> 
>>> when discussing about [1] we found out, that the issue is actually caused 
>>> by processing time watermarks in KinesisIO. Enabling this watermark outputs 
>>> watermarks based on current processing time, _but event timestamps are 
>>> derived from ingestion timestamp_. This can cause unbounded lateness when 
>>> processing backlog. I think this setup is error-prone and will likely cause 
>>> data loss due to dropped elements. This can be solved in two ways: 
>>> 
>>>  a) deprecate processing time watermarks, or 
>>> 
>>>  b) modify KinesisIO's watermark policy so that is assigns event timestamps 
>>> as well (the processing-time watermark policy would have to derive event 
>>> timestamps from processing-time). 
>>> 
>>> I'd prefer option b) , but it might be a breaking change, moreover I'm not 
>>> sure if I understand the purpose of processing-time watermark policy, it 
>>> might be essentially ill defined from the beginning, thus it might really 
>>> be better to remove it completely. There is also a related issue [2]. 
>>> 
>>> Any thoughts on this? 
>>> 
>>>  Jan 
>>> 
>>> [1] https://github.com/apache/beam/issues/25975 
>>> 
>>> [2] https://github.com/apache/beam/issues/28760 
>>> 
>> 



Re: Streaming update compatibility

2023-10-27 Thread Robert Burke
On Fri, Oct 27, 2023, 9:09 AM Robert Bradshaw via dev 
wrote:

> On Fri, Oct 27, 2023 at 7:50 AM Kellen Dye via dev 
> wrote:
> >
> > > Auto is hard, because it would involve
> > > querying the runner before pipeline construction, and we may not even
> > > know what the runner is at this point
> >
> > At the point where pipeline construction will start, you should have
> access to the pipeline arguments and be able to determine the runner. What
> seems to be missing is a place to query the runner pre-construction. If
> that query could return metadata about the currently running version of the
> job, then that could be incorporated into graph construction as necessary.
>
> While this is the common case, it is not true in general. For example
> it's possible to cache the pipeline proto and submit it to a separate
> choice of runner later. We have Jobs API implementations that
> forward/proxy the job to other runners, and the Python interactive
> runner is another example where the runner is late-binding (e.g. one
> tries a sample locally, and if all looks good can execute remotely,
> and also in this case the graph that's submitted is often mutated
> before running).
>
> Also, in the spirit of the portability story, the pipeline definition
> itself should be runner-independent.
>
> > That same hook could be a place to for example return the
> currently-running job graph for pre-submission compatibility checks.
>
> I suppose we could add something to the Jobs API to make "looking up a
> previous version of this pipeline" runner-agnostic, though that
> assumes it's available at construction time.


As I pointed out,  we can access a given pipeline via the job management
API. It's already runner agnostic other than Dataflow.

Operationally though, we'd need to provide the option to "dry run" an
update locally, or validate update compatibility against a given pipeline
proto.

And +1 as Kellen says we

> should define (and be able to check) what pipeline compatibility means
> in a via graph-to-graph comparison at the Beam level. I'll defer both
> of these as future work as part of the "make update a portable Beam
> concept" project.
>

Big +1 to that. Hard to know what to check for without defining it. This
would avoid needing to ask a given runner WRT dry run updates.

It's on a longer term plan, but I have intended to add Pipeline Update as a
feature to Prism. As it becomes more fully featured, it becomes a great
test bed to develop the definitions.

>


Re: Streaming update compatibility

2023-10-27 Thread Robert Bradshaw via dev
On Fri, Oct 27, 2023 at 7:50 AM Kellen Dye via dev  wrote:
>
> > Auto is hard, because it would involve
> > querying the runner before pipeline construction, and we may not even
> > know what the runner is at this point
>
> At the point where pipeline construction will start, you should have access 
> to the pipeline arguments and be able to determine the runner. What seems to 
> be missing is a place to query the runner pre-construction. If that query 
> could return metadata about the currently running version of the job, then 
> that could be incorporated into graph construction as necessary.

While this is the common case, it is not true in general. For example
it's possible to cache the pipeline proto and submit it to a separate
choice of runner later. We have Jobs API implementations that
forward/proxy the job to other runners, and the Python interactive
runner is another example where the runner is late-binding (e.g. one
tries a sample locally, and if all looks good can execute remotely,
and also in this case the graph that's submitted is often mutated
before running).

Also, in the spirit of the portability story, the pipeline definition
itself should be runner-independent.

> That same hook could be a place to for example return the currently-running 
> job graph for pre-submission compatibility checks.

I suppose we could add something to the Jobs API to make "looking up a
previous version of this pipeline" runner-agnostic, though that
assumes it's available at construction time. And +1 as Kellen says we
should define (and be able to check) what pipeline compatibility means
in a via graph-to-graph comparison at the Beam level. I'll defer both
of these as future work as part of the "make update a portable Beam
concept" project.


Re: Processing time watermarks in KinesisIO

2023-10-27 Thread Jan Lukavský
No, I'm referring to this [1] policy which has unexpected (and hardly 
avoidable on the user-code side) data loss issues. The problem is that 
assigning timestamps to elements and watermarks is completely decoupled 
and unrelated, which I'd say is a bug.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--


On 10/27/23 16:51, Alexey Romanenko wrote:
Why not just to create a custom watermark policy for that? Or you mean 
to make it as a default policy?


—
Alexey


On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:


Hi,

when discussing about [1] we found out, that the issue is actually 
caused by processing time watermarks in KinesisIO. Enabling this 
watermark outputs watermarks based on current processing time, _but 
event timestamps are derived from ingestion timestamp_. This can 
cause unbounded lateness when processing backlog. I think this setup 
is error-prone and will likely cause data loss due to dropped 
elements. This can be solved in two ways:


 a) deprecate processing time watermarks, or

 b) modify KinesisIO's watermark policy so that is assigns event 
timestamps as well (the processing-time watermark policy would have 
to derive event timestamps from processing-time).


I'd prefer option b) , but it might be a breaking change, moreover 
I'm not sure if I understand the purpose of processing-time watermark 
policy, it might be essentially ill defined from the beginning, thus 
it might really be better to remove it completely. There is also a 
related issue [2].


Any thoughts on this?

 Jan

[1] https://github.com/apache/beam/issues/25975

[2] https://github.com/apache/beam/issues/28760



Re: Streaming update compatibility

2023-10-27 Thread Kellen Dye via dev
In Spotify's case we deploy streaming jobs via CI and would ideally verify
compatibility as part of the build process before submitting to dataflow.
Perhaps decoupled from the _running_ pipeline if we had a cache of previous
pipeline versions.

Currently the user experience is poor because any merge of a change ends up
being potentially distant in time from the job submission and failure due
to incompatibility. There's the additional friction of a few layers of
infrastructure between the developer and the job failure logs which means
we can't trivially deliver the failure reason to the developer.



On Fri, Oct 27, 2023 at 11:07 AM Robert Burke  wrote:

> You raise a very good point:
>
>
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto#L54
>
> The job management API does allow for the pipeline proto to be returned.
> So one could get the live job, so the SDK could make reasonable decisions
> before sending to the runner.
>
> Dataflow does have a similar API that xan be adapted.
>
> I am a touch concerned about spreading the update compatibility checks
> around between SDKs and Runners though. But in some cases it would be
> easier for the SDK, eg to ensure VersionA of a transform is used vs
> VersionB, based on the existing transforma used in the job being updated.
>
>
> On Fri, Oct 27, 2023, 7:50 AM Kellen Dye via dev 
> wrote:
>
>> > Auto is hard, because it would involve
>> > querying the runner before pipeline construction, and we may not even
>> > know what the runner is at this point
>>
>> At the point where pipeline construction will start, you should have
>> access to the pipeline arguments and be able to determine the runner. What
>> seems to be missing is a place to query the runner pre-construction. If
>> that query could return metadata about the currently running version of the
>> job, then that could be incorporated into graph construction as necessary.
>>
>> That same hook could be a place to for example return the
>> currently-running job graph for pre-submission compatibility checks.
>>
>>
>>


Re: Streaming update compatibility

2023-10-27 Thread Robert Burke
You raise a very good point:


https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto#L54

The job management API does allow for the pipeline proto to be returned. So
one could get the live job, so the SDK could make reasonable decisions
before sending to the runner.

Dataflow does have a similar API that xan be adapted.

I am a touch concerned about spreading the update compatibility checks
around between SDKs and Runners though. But in some cases it would be
easier for the SDK, eg to ensure VersionA of a transform is used vs
VersionB, based on the existing transforma used in the job being updated.


On Fri, Oct 27, 2023, 7:50 AM Kellen Dye via dev 
wrote:

> > Auto is hard, because it would involve
> > querying the runner before pipeline construction, and we may not even
> > know what the runner is at this point
>
> At the point where pipeline construction will start, you should have
> access to the pipeline arguments and be able to determine the runner. What
> seems to be missing is a place to query the runner pre-construction. If
> that query could return metadata about the currently running version of the
> job, then that could be incorporated into graph construction as necessary.
>
> That same hook could be a place to for example return the
> currently-running job graph for pre-submission compatibility checks.
>
>
>


Re: Processing time watermarks in KinesisIO

2023-10-27 Thread Alexey Romanenko
Why not just to create a custom watermark policy for that? Or you mean to make 
it as a default policy?

—
Alexey

> On 27 Oct 2023, at 10:25, Jan Lukavský  wrote:
> 
> 
> Hi, 
> 
> when discussing about [1] we found out, that the issue is actually caused by 
> processing time watermarks in KinesisIO. Enabling this watermark outputs 
> watermarks based on current processing time, _but event timestamps are 
> derived from ingestion timestamp_. This can cause unbounded lateness when 
> processing backlog. I think this setup is error-prone and will likely cause 
> data loss due to dropped elements. This can be solved in two ways: 
> 
>  a) deprecate processing time watermarks, or 
> 
>  b) modify KinesisIO's watermark policy so that is assigns event timestamps 
> as well (the processing-time watermark policy would have to derive event 
> timestamps from processing-time). 
> 
> I'd prefer option b) , but it might be a breaking change, moreover I'm not 
> sure if I understand the purpose of processing-time watermark policy, it 
> might be essentially ill defined from the beginning, thus it might really be 
> better to remove it completely. There is also a related issue [2]. 
> 
> Any thoughts on this? 
> 
>  Jan 
> 
> [1] https://github.com/apache/beam/issues/25975 
> 
> [2] https://github.com/apache/beam/issues/28760 
> 



Re: Streaming update compatibility

2023-10-27 Thread Kellen Dye via dev
> Auto is hard, because it would involve
> querying the runner before pipeline construction, and we may not even
> know what the runner is at this point

At the point where pipeline construction will start, you should have access
to the pipeline arguments and be able to determine the runner. What seems
to be missing is a place to query the runner pre-construction. If that
query could return metadata about the currently running version of the job,
then that could be incorporated into graph construction as necessary.

That same hook could be a place to for example return the currently-running
job graph for pre-submission compatibility checks.


Beam High Priority Issue Report (46)

2023-10-27 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/29099 [Bug]: FnAPI Java SDK Harness 
doesn't update user counters in OnTimer callback functions
https://github.com/apache/beam/issues/29076 [Failing Test]: Python ARM 
PostCommit failing after #28385
https://github.com/apache/beam/issues/29022 [Failing Test]: Python Github 
actions tests are failing due to update of pip 
https://github.com/apache/beam/issues/28760 [Bug]: EFO Kinesis IO reader 
provided by apache beam does not pick the event time for watermarking
https://github.com/apache/beam/issues/28703 [Failing Test]: Building a wheel 
for integration tests sometimes times out
https://github.com/apache/beam/issues/28383 [Failing Test]: 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testMaxThreadMetric
https://github.com/apache/beam/issues/28339 Fix failing 
"beam_PostCommit_XVR_GoUsingJava_Dataflow" job
https://github.com/apache/beam/issues/28326 Bug: 
apache_beam.io.gcp.pubsublite.ReadFromPubSubLite not working
https://github.com/apache/beam/issues/28142 [Bug]: [Go SDK] Memory seems to be 
leaking on 2.49.0 with Dataflow
https://github.com/apache/beam/issues/27892 [Bug]: ignoreUnknownValues not 
working when using CreateDisposition.CREATE_IF_NEEDED 
https://github.com/apache/beam/issues/27648 [Bug]: Python SDFs (e.g. 
PeriodicImpulse) running in Flink and polling using tracker.defer_remainder 
have checkpoint size growing indefinitely 
https://github.com/apache/beam/issues/27616 [Bug]: Unable to use 
applyRowMutations() in bigquery IO apache beam java
https://github.com/apache/beam/issues/27486 [Bug]: Read from datastore with 
inequality filters
https://github.com/apache/beam/issues/27314 [Failing Test]: 
bigquery.StorageApiSinkCreateIfNeededIT.testCreateManyTables[1]
https://github.com/apache/beam/issues/27238 [Bug]: Window trigger has lag when 
using Kafka and GroupByKey on Dataflow Runner
https://github.com/apache/beam/issues/26981 [Bug]: Getting an error related to 
SchemaCoder after upgrading to 2.48
https://github.com/apache/beam/issues/26911 [Bug]: UNNEST ARRAY with a nested 
ROW (described below)
https://github.com/apache/beam/issues/26343 [Bug]: 
apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests.test_read_queries is 
flaky
https://github.com/apache/beam/issues/26329 [Bug]: BigQuerySourceBase does not 
propagate a Coder to AvroSource
https://github.com/apache/beam/issues/26041 [Bug]: Unable to create 
exactly-once Flink pipeline with stream source and file sink
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21476 WriteToBigQuery Dynamic table 
destinations returns wrong tableId
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 

Processing time watermarks in KinesisIO

2023-10-27 Thread Jan Lukavský

Hi,

when discussing about [1] we found out, that the issue is actually 
caused by processing time watermarks in KinesisIO. Enabling this 
watermark outputs watermarks based on current processing time, _but 
event timestamps are derived from ingestion timestamp_. This can cause 
unbounded lateness when processing backlog. I think this setup is 
error-prone and will likely cause data loss due to dropped elements. 
This can be solved in two ways:


 a) deprecate processing time watermarks, or

 b) modify KinesisIO's watermark policy so that is assigns event 
timestamps as well (the processing-time watermark policy would have to 
derive event timestamps from processing-time).


I'd prefer option b) , but it might be a breaking change, moreover I'm 
not sure if I understand the purpose of processing-time watermark 
policy, it might be essentially ill defined from the beginning, thus it 
might really be better to remove it completely. There is also a related 
issue [2].


Any thoughts on this?

 Jan

[1] https://github.com/apache/beam/issues/25975

[2] https://github.com/apache/beam/issues/28760