Re: [Question] Bundle finalization callback

2023-10-15 Thread Johanna Öjeling via dev
Got it. The ack deadline is configurable on the consumer but this gave me
some new ideas. Thanks!

Johanna

On Sun, Oct 15, 2023, 22:51 Robert Burke  wrote:

> I would recommend avoiding over fitting to a specific runner, but given
> the constraints, I'd say being eager about self checkpointing and ensuring
> fine grain splits. This will allow runners to schedule more bundles in
> parallel if they are able, and provide independence between them.
>
> Part of the issue is that the downstream transforms will eat into that ack
> deadline time as well. 30s is all the time to pull the message, process it
> and any children downstream of the Read, and so on.
>
> Dataflow biases towards small bundles during streaming execution, but
> setting short Process Continuation suggestions should allow for low latency.
>
> All that said, 30s sounds fairly short for an ack timeout (knowing little
> about the specific source you're adding). I know that Google Cloud PubSub
> auto-extends ack deadlines as long as the client connection remains open.
> This is done automatically by the client itself. That's an alternative
> possibility as well if the datasource supports it: manually extended the
> ack deadline until the bundle completes normally, and then allowing
> finalization to happen. (Balanced with how much state stays in memory and
> so on).
>
>
> On Sun, Oct 15, 2023, 1:30 PM Johanna Öjeling  wrote:
>
>> Okay I see, thank you for your quick reply! I'll have a look into that
>> file.
>>
>> Do you have an idea of on which interval I could expect the Dataflow
>> runner to initiate the finalization? Thinking of the case where I have a
>> message ack deadline of e.g. 30s and a continuous stream of messages that
>> keeps the ProcessElement active. Then I will want to interrupt processing
>> of new messages and self-checkpoint before those 30s have passed, if the
>> runner hasn't initiated it within that time frame.
>>
>> Johanna
>>
>> On Sun, Oct 15, 2023, 21:13 Robert Burke  wrote:
>>
>>> Hi! Nswers inline.
>>>
>>> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hi,

 I'm working on a native streaming IO connector for the Go SDK to enable
 reads and writes from/to NATS (#29000
 ) and would like to
 better understand how bundle finalization works.

 For this use case I need to register a callback function which
 acknowledges the processed messages.

 More concretely, I wonder:

- When/with which interval is the callback function invoked? Is it
supposed to happen at every runner and SDF initiated checkpoint?

 A runner that supports it should be calling back to the SDK for
>>> finalization after a bundle is completed, regardless of how the bundle
>>> terminates successfully. Eg. Via splits, or completed data, or process
>>> continuation resume/stop.
>>>
>>>
- Is it correct that the registered callback is called and retried
on a best effort? What is the estimated success rate for this best 
 effort?

 I don't believe the callback would be retried if it failed, since
>>> technically the point is to callback after the runner has committed the
>>> bundles output successfully. A failure in finalization would require
>>> retractions, essentially.
>>>
>>> It is best effort, since a callback could expire before bundle execution
>>> completes.
>>>

 When running a WIP example pipeline
 
 on Dataflow the callback function is only ever called once but ignored
 after subsequent checkpoints so thereby my questions.

>>>
>>> That is very odd. My understanding is that it should be after every
>>> complete bundle, if the stage contains a registered callback prior to it's
>>> expiration time.
>>>
>>> The SDK code has a lot of time.Now() checks, so depending if the
>>> expiration time is set relative to processing time, it could be mistakenly
>>> dropping the valid callback.
>>>
>>> It does look like failed callbacks are re-queued after being called;
>>> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
>>> are never run.
>>>
>>> I'm not sure what the correct runner side behavior is though.
>>>
>>>
 If anyone could help clarify the above or point me towards some
 documentation that would be much appreciated!

 Thanks,
 Johanna




Re: [Question] Bundle finalization callback

2023-10-15 Thread Robert Burke
I would recommend avoiding over fitting to a specific runner, but given the
constraints, I'd say being eager about self checkpointing and ensuring fine
grain splits. This will allow runners to schedule more bundles in parallel
if they are able, and provide independence between them.

Part of the issue is that the downstream transforms will eat into that ack
deadline time as well. 30s is all the time to pull the message, process it
and any children downstream of the Read, and so on.

Dataflow biases towards small bundles during streaming execution, but
setting short Process Continuation suggestions should allow for low latency.

All that said, 30s sounds fairly short for an ack timeout (knowing little
about the specific source you're adding). I know that Google Cloud PubSub
auto-extends ack deadlines as long as the client connection remains open.
This is done automatically by the client itself. That's an alternative
possibility as well if the datasource supports it: manually extended the
ack deadline until the bundle completes normally, and then allowing
finalization to happen. (Balanced with how much state stays in memory and
so on).


On Sun, Oct 15, 2023, 1:30 PM Johanna Öjeling  wrote:

> Okay I see, thank you for your quick reply! I'll have a look into that
> file.
>
> Do you have an idea of on which interval I could expect the Dataflow
> runner to initiate the finalization? Thinking of the case where I have a
> message ack deadline of e.g. 30s and a continuous stream of messages that
> keeps the ProcessElement active. Then I will want to interrupt processing
> of new messages and self-checkpoint before those 30s have passed, if the
> runner hasn't initiated it within that time frame.
>
> Johanna
>
> On Sun, Oct 15, 2023, 21:13 Robert Burke  wrote:
>
>> Hi! Nswers inline.
>>
>> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>>
>>> I'm working on a native streaming IO connector for the Go SDK to enable
>>> reads and writes from/to NATS (#29000
>>> ) and would like to better
>>> understand how bundle finalization works.
>>>
>>> For this use case I need to register a callback function which
>>> acknowledges the processed messages.
>>>
>>> More concretely, I wonder:
>>>
>>>- When/with which interval is the callback function invoked? Is it
>>>supposed to happen at every runner and SDF initiated checkpoint?
>>>
>>> A runner that supports it should be calling back to the SDK for
>> finalization after a bundle is completed, regardless of how the bundle
>> terminates successfully. Eg. Via splits, or completed data, or process
>> continuation resume/stop.
>>
>>
>>>- Is it correct that the registered callback is called and retried
>>>on a best effort? What is the estimated success rate for this best 
>>> effort?
>>>
>>> I don't believe the callback would be retried if it failed, since
>> technically the point is to callback after the runner has committed the
>> bundles output successfully. A failure in finalization would require
>> retractions, essentially.
>>
>> It is best effort, since a callback could expire before bundle execution
>> completes.
>>
>>>
>>> When running a WIP example pipeline
>>> 
>>> on Dataflow the callback function is only ever called once but ignored
>>> after subsequent checkpoints so thereby my questions.
>>>
>>
>> That is very odd. My understanding is that it should be after every
>> complete bundle, if the stage contains a registered callback prior to it's
>> expiration time.
>>
>> The SDK code has a lot of time.Now() checks, so depending if the
>> expiration time is set relative to processing time, it could be mistakenly
>> dropping the valid callback.
>>
>> It does look like failed callbacks are re-queued after being called;
>> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
>> are never run.
>>
>> I'm not sure what the correct runner side behavior is though.
>>
>>
>>> If anyone could help clarify the above or point me towards some
>>> documentation that would be much appreciated!
>>>
>>> Thanks,
>>> Johanna
>>>
>>>


Re: [Question] Bundle finalization callback

2023-10-15 Thread Johanna Öjeling via dev
Okay I see, thank you for your quick reply! I'll have a look into that file.

Do you have an idea of on which interval I could expect the Dataflow runner
to initiate the finalization? Thinking of the case where I have a message
ack deadline of e.g. 30s and a continuous stream of messages that keeps the
ProcessElement active. Then I will want to interrupt processing of new
messages and self-checkpoint before those 30s have passed, if the runner
hasn't initiated it within that time frame.

Johanna

On Sun, Oct 15, 2023, 21:13 Robert Burke  wrote:

> Hi! Nswers inline.
>
> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <
> dev@beam.apache.org> wrote:
>
>> Hi,
>>
>> I'm working on a native streaming IO connector for the Go SDK to enable
>> reads and writes from/to NATS (#29000
>> ) and would like to better
>> understand how bundle finalization works.
>>
>> For this use case I need to register a callback function which
>> acknowledges the processed messages.
>>
>> More concretely, I wonder:
>>
>>- When/with which interval is the callback function invoked? Is it
>>supposed to happen at every runner and SDF initiated checkpoint?
>>
>> A runner that supports it should be calling back to the SDK for
> finalization after a bundle is completed, regardless of how the bundle
> terminates successfully. Eg. Via splits, or completed data, or process
> continuation resume/stop.
>
>
>>- Is it correct that the registered callback is called and retried on
>>a best effort? What is the estimated success rate for this best effort?
>>
>> I don't believe the callback would be retried if it failed, since
> technically the point is to callback after the runner has committed the
> bundles output successfully. A failure in finalization would require
> retractions, essentially.
>
> It is best effort, since a callback could expire before bundle execution
> completes.
>
>>
>> When running a WIP example pipeline
>> 
>> on Dataflow the callback function is only ever called once but ignored
>> after subsequent checkpoints so thereby my questions.
>>
>
> That is very odd. My understanding is that it should be after every
> complete bundle, if the stage contains a registered callback prior to it's
> expiration time.
>
> The SDK code has a lot of time.Now() checks, so depending if the
> expiration time is set relative to processing time, it could be mistakenly
> dropping the valid callback.
>
> It does look like failed callbacks are re-queued after being called;
> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
> are never run.
>
> I'm not sure what the correct runner side behavior is though.
>
>
>> If anyone could help clarify the above or point me towards some
>> documentation that would be much appreciated!
>>
>> Thanks,
>> Johanna
>>
>>


Re: [Question] Bundle finalization callback

2023-10-15 Thread Robert Burke
Hi! Nswers inline.

On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev 
wrote:

> Hi,
>
> I'm working on a native streaming IO connector for the Go SDK to enable
> reads and writes from/to NATS (#29000
> ) and would like to better
> understand how bundle finalization works.
>
> For this use case I need to register a callback function which
> acknowledges the processed messages.
>
> More concretely, I wonder:
>
>- When/with which interval is the callback function invoked? Is it
>supposed to happen at every runner and SDF initiated checkpoint?
>
> A runner that supports it should be calling back to the SDK for
finalization after a bundle is completed, regardless of how the bundle
terminates successfully. Eg. Via splits, or completed data, or process
continuation resume/stop.


>- Is it correct that the registered callback is called and retried on
>a best effort? What is the estimated success rate for this best effort?
>
> I don't believe the callback would be retried if it failed, since
technically the point is to callback after the runner has committed the
bundles output successfully. A failure in finalization would require
retractions, essentially.

It is best effort, since a callback could expire before bundle execution
completes.

>
> When running a WIP example pipeline
> 
> on Dataflow the callback function is only ever called once but ignored
> after subsequent checkpoints so thereby my questions.
>

That is very odd. My understanding is that it should be after every
complete bundle, if the stage contains a registered callback prior to it's
expiration time.

The SDK code has a lot of time.Now() checks, so depending if the expiration
time is set relative to processing time, it could be mistakenly dropping
the valid callback.

It does look like failed callbacks are re-queued after being called;
(exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
are never run.

I'm not sure what the correct runner side behavior is though.


> If anyone could help clarify the above or point me towards some
> documentation that would be much appreciated!
>
> Thanks,
> Johanna
>
>


[Question] Bundle finalization callback

2023-10-15 Thread Johanna Öjeling via dev
Hi,

I'm working on a native streaming IO connector for the Go SDK to enable
reads and writes from/to NATS (#29000
) and would like to better
understand how bundle finalization works.

For this use case I need to register a callback function which acknowledges
the processed messages.

More concretely, I wonder:

   - When/with which interval is the callback function invoked? Is it
   supposed to happen at every runner and SDF initiated checkpoint?
   - Is it correct that the registered callback is called and retried on a
   best effort? What is the estimated success rate for this best effort?


When running a WIP example pipeline

on Dataflow the callback function is only ever called once but ignored
after subsequent checkpoints so thereby my questions.

If anyone could help clarify the above or point me towards some
documentation that would be much appreciated!

Thanks,
Johanna