Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-09-06 Thread Sagar
Hey All,

I had an offline discussion with Yash on this and while so far there didn't
seem to be a pressing need to introduce the delete offsets mechanism via
the updateOffsets method, Yash had brought up an interesting point. Point
being that if we don't introduce the deletion of offsets mechanism in this
KIP but do it in a Future version, then connector developers and users
would get different behaviour on tombstone offsets based on the runtime
version being run. This could lead to confusion.

Considering this, I have updated the KIP to also allow deleting offsets via
tombstone records. Thanks Yash for closing out on this one!

Hopefully all open questions have now been addressed.

Thanks!
Sagar.

On Tue, Aug 29, 2023 at 3:33 PM Yash Mayya  wrote:

> Hi Sagar,
>
> > The size of offsets topic can be controlled by
> > setting appropriate topic retention values and
> > that is a standard practice in Kafka
>
> Kafka Connect enforces the `cleanup.policy` configuration for the offsets
> topic to be "compact" only (references - [1], [2]), so the topic retention
> related configurations won't be relevant right?
>
> > Deleting offsets is not something which should
> > be done very frequently and should be handled
> > with care
>
> > Agreed this involves some toil but it's not something
> > that should be done on a very regular basis.
>
> I'm not sure I follow how we came to this conclusion, could you please
> expand on the pitfalls of allowing connector plugins to wipe the offsets
> for source partitions that they no longer care about?
>
> > The usecases you highlighted are edge cases at
> > best. As I have been saying, if it is needed we can
> > always add it in the future but that doesn't look like
> > a problem we need to solve upfront.
>
> I agree that these cases might not be too common, but I'm just trying to
> understand the reasoning behind preventing this use case since null offsets
> don't require any separate handling from the Connect runtime's point of
> view (and wouldn't need any additional implementation work in this KIP).
>
> Thanks,
> Yash
>
> [1] - https://kafka.apache.org/documentation/#connect_running
> [2] -
>
> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java#L47
>
> On Mon, Aug 28, 2023 at 1:38 PM Sagar  wrote:
>
> > Hey Yash,
> >
> > Thanks for your further comments. Here are my responses:
> >
> > 1) Deleting offsets via updateOffsets.
> >
> > Hmm, I am not sure this is really necessary to be part of the KIP at this
> > point, and we can always add it later on if needed. I say this for the
> > following reasons:
> >
> >
> >- The size of offsets topic can be controlled by setting appropriate
> >topic retention values and that is a standard practice in Kafka. Sure
> > it's
> >not always possible to get the right values but as I said it is a
> > standard
> >practice. For Connect specifically, there is also a KIP (KIP-943
> ><
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073470
> > >)
> >which is trying to solve the problem of a large connect-offsets topic.
> > So,
> >if that is really the motivation, then these are being addressed
> > separately
> >anyways.
> >- Deleting offsets is not something which should be done very
> frequently
> >and should be handled with care. That is why KIP-875's mechanism to
> have
> >users/ cluster admin do this externally is the right thing to do.
> Agreed
> >this involves some toil but it's not something that should be done on
> a
> >very regular basis.
> >- There is no stopping connector implementations to send tombstone
> >records as offsets but in practice how many connectors actually do it?
> >Maybe 1 or 2 from what we discussed.
> >- The usecases you highlighted are edge cases at best. As I have been
> >saying, if it is needed we can always add it in the future but that
> > doesn't
> >look like a problem we need to solve upfront.
> >
> > Due to these reasons, I don't think this is a point that we need to
> stress
> > so much upon. I say this because offsets topic's purging/clean up can be
> > handled either via standard Kafka techniques (point #1 above) or via
> > Connect runtime techniques (Pt #2  above). IMO the problem we are trying
> to
> > solve via this KIP has been solved by connectors using techniques which
> > have been termed as having higher maintenance cost or a high cognitive
> load
> > (i.e separate topic) and that needs to be addressed upfront. And since
> you
> > yourself termed it as a nice to have feature, we can leave it to that and
> > take it up as Future Work. Hope that's ok with you and other community
> > members.
> >
> > 2) Purpose of offsets parameter in updateOffsets
> >
> > The main purpose is to provide the task with the visibility into what
> > partitions are getting their offsets committed. It is not

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-29 Thread Yash Mayya
Hi Sagar,

> The size of offsets topic can be controlled by
> setting appropriate topic retention values and
> that is a standard practice in Kafka

Kafka Connect enforces the `cleanup.policy` configuration for the offsets
topic to be "compact" only (references - [1], [2]), so the topic retention
related configurations won't be relevant right?

> Deleting offsets is not something which should
> be done very frequently and should be handled
> with care

> Agreed this involves some toil but it's not something
> that should be done on a very regular basis.

I'm not sure I follow how we came to this conclusion, could you please
expand on the pitfalls of allowing connector plugins to wipe the offsets
for source partitions that they no longer care about?

> The usecases you highlighted are edge cases at
> best. As I have been saying, if it is needed we can
> always add it in the future but that doesn't look like
> a problem we need to solve upfront.

I agree that these cases might not be too common, but I'm just trying to
understand the reasoning behind preventing this use case since null offsets
don't require any separate handling from the Connect runtime's point of
view (and wouldn't need any additional implementation work in this KIP).

Thanks,
Yash

[1] - https://kafka.apache.org/documentation/#connect_running
[2] -
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java#L47

On Mon, Aug 28, 2023 at 1:38 PM Sagar  wrote:

> Hey Yash,
>
> Thanks for your further comments. Here are my responses:
>
> 1) Deleting offsets via updateOffsets.
>
> Hmm, I am not sure this is really necessary to be part of the KIP at this
> point, and we can always add it later on if needed. I say this for the
> following reasons:
>
>
>- The size of offsets topic can be controlled by setting appropriate
>topic retention values and that is a standard practice in Kafka. Sure
> it's
>not always possible to get the right values but as I said it is a
> standard
>practice. For Connect specifically, there is also a KIP (KIP-943
><
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073470
> >)
>which is trying to solve the problem of a large connect-offsets topic.
> So,
>if that is really the motivation, then these are being addressed
> separately
>anyways.
>- Deleting offsets is not something which should be done very frequently
>and should be handled with care. That is why KIP-875's mechanism to have
>users/ cluster admin do this externally is the right thing to do. Agreed
>this involves some toil but it's not something that should be done on a
>very regular basis.
>- There is no stopping connector implementations to send tombstone
>records as offsets but in practice how many connectors actually do it?
>Maybe 1 or 2 from what we discussed.
>- The usecases you highlighted are edge cases at best. As I have been
>saying, if it is needed we can always add it in the future but that
> doesn't
>look like a problem we need to solve upfront.
>
> Due to these reasons, I don't think this is a point that we need to stress
> so much upon. I say this because offsets topic's purging/clean up can be
> handled either via standard Kafka techniques (point #1 above) or via
> Connect runtime techniques (Pt #2  above). IMO the problem we are trying to
> solve via this KIP has been solved by connectors using techniques which
> have been termed as having higher maintenance cost or a high cognitive load
> (i.e separate topic) and that needs to be addressed upfront. And since you
> yourself termed it as a nice to have feature, we can leave it to that and
> take it up as Future Work. Hope that's ok with you and other community
> members.
>
> 2) Purpose of offsets parameter in updateOffsets
>
> The main purpose is to provide the task with the visibility into what
> partitions are getting their offsets committed. It is not necessary that a
> task might choose to update offsets everytime it sees that a given source
> partition is missing from the about to be committed offsets. Maybe it
> chooses to wait for some X iterations or X amount of time and send out an
> updated offset for a partition only when such thresholds are breached. Even
> here we could argue that since it's sending the partition/offsets it can do
> the tracking on it's own, but IMO that is too much work given that the
> information is already available via offsets to be committed.
>
> Thanks!
> Sagar.
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-28 Thread Sagar
Hey Yash,

Thanks for your further comments. Here are my responses:

1) Deleting offsets via updateOffsets.

Hmm, I am not sure this is really necessary to be part of the KIP at this
point, and we can always add it later on if needed. I say this for the
following reasons:


   - The size of offsets topic can be controlled by setting appropriate
   topic retention values and that is a standard practice in Kafka. Sure it's
   not always possible to get the right values but as I said it is a standard
   practice. For Connect specifically, there is also a KIP (KIP-943
   )
   which is trying to solve the problem of a large connect-offsets topic. So,
   if that is really the motivation, then these are being addressed separately
   anyways.
   - Deleting offsets is not something which should be done very frequently
   and should be handled with care. That is why KIP-875's mechanism to have
   users/ cluster admin do this externally is the right thing to do. Agreed
   this involves some toil but it's not something that should be done on a
   very regular basis.
   - There is no stopping connector implementations to send tombstone
   records as offsets but in practice how many connectors actually do it?
   Maybe 1 or 2 from what we discussed.
   - The usecases you highlighted are edge cases at best. As I have been
   saying, if it is needed we can always add it in the future but that doesn't
   look like a problem we need to solve upfront.

Due to these reasons, I don't think this is a point that we need to stress
so much upon. I say this because offsets topic's purging/clean up can be
handled either via standard Kafka techniques (point #1 above) or via
Connect runtime techniques (Pt #2  above). IMO the problem we are trying to
solve via this KIP has been solved by connectors using techniques which
have been termed as having higher maintenance cost or a high cognitive load
(i.e separate topic) and that needs to be addressed upfront. And since you
yourself termed it as a nice to have feature, we can leave it to that and
take it up as Future Work. Hope that's ok with you and other community
members.

2) Purpose of offsets parameter in updateOffsets

The main purpose is to provide the task with the visibility into what
partitions are getting their offsets committed. It is not necessary that a
task might choose to update offsets everytime it sees that a given source
partition is missing from the about to be committed offsets. Maybe it
chooses to wait for some X iterations or X amount of time and send out an
updated offset for a partition only when such thresholds are breached. Even
here we could argue that since it's sending the partition/offsets it can do
the tracking on it's own, but IMO that is too much work given that the
information is already available via offsets to be committed.

Thanks!
Sagar.


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-22 Thread Yash Mayya
Hi Sagar,

Thanks for the updates and apologies for the delayed response.

> Hmm the question is how do you qualify a
> partition as stale or old? Let's say a connector
>  has implemented updateOffsets and for a certain
> partition for which no records are received then it
> will update it's offsets. So technically that offset can't
> be termed as stale anymore

The "staleness" was not from the point of view of the offsets, but the
source partition itself. For instance, if a database source connector is
monitoring a number of tables (each modelled as a source partition) and
detects that a table has been dropped, it might be nice to allow the
connector to wipe the offset for that source partition. Similarly, a file
based source connector that is reading from multiple files in a directory
might want to wipe the offsets for a source file that has been deleted.

> Even though I can't think of a side effect at this
> point to disallow offset deletion via this method,
> my opinion is to use a proper mechanism like the
> ones introduced in KIP-875 to delete offsets. Moreover,
> if I also consider the option presented in point #2 , for
> simplicity sake it seems better to not add this feature at
> this point

The KIP-875 APIs would allow users / cluster administrators to manually
wipe offsets externally. However, for the cases that I've outlined above,
it would be additional toil for the operator and something that would be
more suitable to be done by the connector itself. Also, I'm not sure if I'm
missing something here, but I don't get why allowing tombstone offsets
would add any complexity here?

> I get the point now. I can't think of cases where
> updating offsets would be needed.

Given that we're disallowing updating offsets for source partitions whose
offsets are about to be committed (or removing such source partitions
altogether), I'm wondering what purpose does the "offsets" parameter in the
newly proposed SourceTask::updateOffsets method serve?

Thanks,
Yash

On Fri, Jul 28, 2023 at 1:41 PM Sagar  wrote:

> Hey Yash,
>
> Thanks for your comments.
>
> 1) Hmm the question is how do you qualify a partition as stale or old?
> Let's say a connector has implemented updateOffsets and for a certain
> partition for which no records are received then it will update it's
> offsets. So technically that offset can't be termed as stale anymore. Even
> though I can't think of a side effect at this point to disallow offset
> deletion via this method, my opinion is to use a proper mechanism like the
> ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
> the option presented in point #2 , for simplicity sake it seems better to
> not add this feature at this point. If we feel it's really needed and users
> are requesting it, we can add support for it later on.
>
> 2) I get the point now. I can't think of cases where updating offsets would
> be needed. As with point #1, we can always add it back if needed later on.
> For now, I have removed that part from the KIP.
>
> 3) Yes, because the offset commit happens on a different thread, ordering
> guarantees might be harder to ensure if we do it from the other thread. The
> current mechanism proposed, even though gets invoked multiple times, keeps
> things simpler to reason about.
>
> Let me know how things look now. If it's all looking ok, I would go ahead
> and create a Vote thread for the same.
>
> Thanks!
> Sagar.
>
> On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya  wrote:
>
> > Hi Sagar,
> >
> > Thanks for the updates. I had a few more follow up questions:
> >
> > > I have added that a better way of doing that would be
> > > via KIP-875. Also, I didn't want to include any mechamisms
> > > for users to meddle with the offsets topic. Allowing tombstone
> > > records via this method would be akin to publishing tombstone
> > > records directly to the offsets topic which is not recommended
> > > generally.
> >
> > KIP-875 would allow a way for cluster administrators and / or users to do
> > so manually externally whereas allowing tombstones in
> > SourceTask::updateOffsets would enable connectors to clean up offsets for
> > old / stale partitions without user intervention right? I'm not sure I
> > follow what you mean by "I didn't want to include any mechamisms for
> users
> > to meddle with the offsets topic" here? Furthermore, I'm not sure why
> > publishing tombstone records directly to the offsets topic would not be
> > recommended? Isn't that currently the only way to manually clean up
> offsets
> > for a source connector?
> >
> > > It could be useful in a scenario where the offset of a partition
> > > doesn't update for some period of time. In such cases, the
> > > connector can do some kind of state tracking and update the
> > > offsets after the time period elapses.
> >
> > I'm not sure I follow? In this case, won't the offsets argument passed
> > to SourceTask::updateOffsets *not *contain the source partition which
> > hasn't had an update for a lon

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-28 Thread Sagar
Hey Yash,

Thanks for your comments.

1) Hmm the question is how do you qualify a partition as stale or old?
Let's say a connector has implemented updateOffsets and for a certain
partition for which no records are received then it will update it's
offsets. So technically that offset can't be termed as stale anymore. Even
though I can't think of a side effect at this point to disallow offset
deletion via this method, my opinion is to use a proper mechanism like the
ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
the option presented in point #2 , for simplicity sake it seems better to
not add this feature at this point. If we feel it's really needed and users
are requesting it, we can add support for it later on.

2) I get the point now. I can't think of cases where updating offsets would
be needed. As with point #1, we can always add it back if needed later on.
For now, I have removed that part from the KIP.

3) Yes, because the offset commit happens on a different thread, ordering
guarantees might be harder to ensure if we do it from the other thread. The
current mechanism proposed, even though gets invoked multiple times, keeps
things simpler to reason about.

Let me know how things look now. If it's all looking ok, I would go ahead
and create a Vote thread for the same.

Thanks!
Sagar.

On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the updates. I had a few more follow up questions:
>
> > I have added that a better way of doing that would be
> > via KIP-875. Also, I didn't want to include any mechamisms
> > for users to meddle with the offsets topic. Allowing tombstone
> > records via this method would be akin to publishing tombstone
> > records directly to the offsets topic which is not recommended
> > generally.
>
> KIP-875 would allow a way for cluster administrators and / or users to do
> so manually externally whereas allowing tombstones in
> SourceTask::updateOffsets would enable connectors to clean up offsets for
> old / stale partitions without user intervention right? I'm not sure I
> follow what you mean by "I didn't want to include any mechamisms for users
> to meddle with the offsets topic" here? Furthermore, I'm not sure why
> publishing tombstone records directly to the offsets topic would not be
> recommended? Isn't that currently the only way to manually clean up offsets
> for a source connector?
>
> > It could be useful in a scenario where the offset of a partition
> > doesn't update for some period of time. In such cases, the
> > connector can do some kind of state tracking and update the
> > offsets after the time period elapses.
>
> I'm not sure I follow? In this case, won't the offsets argument passed
> to SourceTask::updateOffsets *not *contain the source partition which
> hasn't had an update for a long period of time? Wouldn't it make more sense
> to reduce the surface of the API as Chris suggested and only allow adding
> new partition offset pairs to the about to be committed offsets (since
> there don't seem to be any use cases outlined for allowing connectors to
> update offsets for source partitions that are already about to have an
> offset be committed for)?
>
> > All the records returned by the previous poll invocation
> >  got processed successfully
>
> Thanks for this clarification in the KIP, it looks like it does address the
> offsets ordering issue. As to Chris' point about invoking
> SourceTask::updateOffsets less frequently by calling it before offsets are
> committed rather than in every poll loop iteration - I guess that would
> make it a lot more tricky to address the ordering issue?
>
>
> Thanks,
> Yash
>
> On Thu, Jul 20, 2023 at 9:50 PM Sagar  wrote:
>
> > Hey All,
> >
> > Please let me know how the KIP looks now. Is it at a stage where I can
> > start with the Voting phase? Of course I am still open to
> > feedback/suggestions but planning to start the Vote for it.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 11, 2023 at 10:00 PM Sagar 
> wrote:
> >
> > > Hi Yash/Chris,
> > >
> > > Thanks for the feedback! I have updated the KIP with the suggestions
> > > provided. I would also update the PR with the suggestions.
> > >
> > > Also, I was hoping that this could make it to the 3.6 release given
> that
> > > it would benefit source connectors which have some of the problems
> listed
> > > in the Motivation Section.
> > >
> > > Responses Inline:
> > >
> > > Yash:
> > >
> > > 1) In the proposed changes section where you talk about modifying the
> > >> offsets, could you please clarify that tasks shouldn't modify the
> > offsets
> > >> map that is passed as an argument? Currently, the distinction between
> > the
> > >> offsets map passed as an argument and the offsets map that is returned
> > is
> > >> not very clear in numerous places.
> > >
> > >
> > >
> > > Added
> > >
> > > 2) The default return value of Optional.empty() seems to be fairly
> > >> non-intuitive considering that the return value is suppo

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-25 Thread Yash Mayya
Hi Sagar,

Thanks for the updates. I had a few more follow up questions:

> I have added that a better way of doing that would be
> via KIP-875. Also, I didn't want to include any mechamisms
> for users to meddle with the offsets topic. Allowing tombstone
> records via this method would be akin to publishing tombstone
> records directly to the offsets topic which is not recommended
> generally.

KIP-875 would allow a way for cluster administrators and / or users to do
so manually externally whereas allowing tombstones in
SourceTask::updateOffsets would enable connectors to clean up offsets for
old / stale partitions without user intervention right? I'm not sure I
follow what you mean by "I didn't want to include any mechamisms for users
to meddle with the offsets topic" here? Furthermore, I'm not sure why
publishing tombstone records directly to the offsets topic would not be
recommended? Isn't that currently the only way to manually clean up offsets
for a source connector?

> It could be useful in a scenario where the offset of a partition
> doesn't update for some period of time. In such cases, the
> connector can do some kind of state tracking and update the
> offsets after the time period elapses.

I'm not sure I follow? In this case, won't the offsets argument passed
to SourceTask::updateOffsets *not *contain the source partition which
hasn't had an update for a long period of time? Wouldn't it make more sense
to reduce the surface of the API as Chris suggested and only allow adding
new partition offset pairs to the about to be committed offsets (since
there don't seem to be any use cases outlined for allowing connectors to
update offsets for source partitions that are already about to have an
offset be committed for)?

> All the records returned by the previous poll invocation
>  got processed successfully

Thanks for this clarification in the KIP, it looks like it does address the
offsets ordering issue. As to Chris' point about invoking
SourceTask::updateOffsets less frequently by calling it before offsets are
committed rather than in every poll loop iteration - I guess that would
make it a lot more tricky to address the ordering issue?


Thanks,
Yash

On Thu, Jul 20, 2023 at 9:50 PM Sagar  wrote:

> Hey All,
>
> Please let me know how the KIP looks now. Is it at a stage where I can
> start with the Voting phase? Of course I am still open to
> feedback/suggestions but planning to start the Vote for it.
>
> Thanks!
> Sagar.
>
> On Tue, Jul 11, 2023 at 10:00 PM Sagar  wrote:
>
> > Hi Yash/Chris,
> >
> > Thanks for the feedback! I have updated the KIP with the suggestions
> > provided. I would also update the PR with the suggestions.
> >
> > Also, I was hoping that this could make it to the 3.6 release given that
> > it would benefit source connectors which have some of the problems listed
> > in the Motivation Section.
> >
> > Responses Inline:
> >
> > Yash:
> >
> > 1) In the proposed changes section where you talk about modifying the
> >> offsets, could you please clarify that tasks shouldn't modify the
> offsets
> >> map that is passed as an argument? Currently, the distinction between
> the
> >> offsets map passed as an argument and the offsets map that is returned
> is
> >> not very clear in numerous places.
> >
> >
> >
> > Added
> >
> > 2) The default return value of Optional.empty() seems to be fairly
> >> non-intuitive considering that the return value is supposed to be the
> >> offsets that are to be committed. Can we consider simply returning the
> >> offsets argument itself by default instead?
> >
> >
> >
> > Chris is suggesting returning null for the default case. I am thinking to
> > make null
> > as the default return type. If the returned map is null, there won't be
> > any further
> > processing otherwise we will contonue with the existing logic.
> >
> > 3) The KIP states that "It is also possible that a task might choose to
> >> send a tombstone record as an offset. This is not recommended and to
> >> prevent connectors shooting themselves in the foot due to this" - could
> >> you
> >> please clarify why this is not recommended / supported?
> >
> >
> >
> > I have added that a better way of doing that would be via KIP-875. Also,
> I
> > didn't want to include
> > any mechamisms for users to meddle with the offsets topic. Allowing
> > tombstone records via this method
> > would be akin to publishing tombstone records directly to the offsets
> > topic which is not recommended
> > generally.
> >
> > 4) The KIP states that "If a task returns an Optional of a null object or
> >> an Optional of an empty map, even for such cases the behaviour would
> would
> >> be disabled." - since this is an optional API that source task
> >> implementations don't necessarily need to implement, I don't think I
> fully
> >> follow why the return type of the proposed "updateOffsets" method is an
> >> Optional? Can we not simply use the Map as the return type instead?
> >
> >
> >
> > Yeah, I updated the return

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-20 Thread Sagar
Hey All,

Please let me know how the KIP looks now. Is it at a stage where I can
start with the Voting phase? Of course I am still open to
feedback/suggestions but planning to start the Vote for it.

Thanks!
Sagar.

On Tue, Jul 11, 2023 at 10:00 PM Sagar  wrote:

> Hi Yash/Chris,
>
> Thanks for the feedback! I have updated the KIP with the suggestions
> provided. I would also update the PR with the suggestions.
>
> Also, I was hoping that this could make it to the 3.6 release given that
> it would benefit source connectors which have some of the problems listed
> in the Motivation Section.
>
> Responses Inline:
>
> Yash:
>
> 1) In the proposed changes section where you talk about modifying the
>> offsets, could you please clarify that tasks shouldn't modify the offsets
>> map that is passed as an argument? Currently, the distinction between the
>> offsets map passed as an argument and the offsets map that is returned is
>> not very clear in numerous places.
>
>
>
> Added
>
> 2) The default return value of Optional.empty() seems to be fairly
>> non-intuitive considering that the return value is supposed to be the
>> offsets that are to be committed. Can we consider simply returning the
>> offsets argument itself by default instead?
>
>
>
> Chris is suggesting returning null for the default case. I am thinking to
> make null
> as the default return type. If the returned map is null, there won't be
> any further
> processing otherwise we will contonue with the existing logic.
>
> 3) The KIP states that "It is also possible that a task might choose to
>> send a tombstone record as an offset. This is not recommended and to
>> prevent connectors shooting themselves in the foot due to this" - could
>> you
>> please clarify why this is not recommended / supported?
>
>
>
> I have added that a better way of doing that would be via KIP-875. Also, I
> didn't want to include
> any mechamisms for users to meddle with the offsets topic. Allowing
> tombstone records via this method
> would be akin to publishing tombstone records directly to the offsets
> topic which is not recommended
> generally.
>
> 4) The KIP states that "If a task returns an Optional of a null object or
>> an Optional of an empty map, even for such cases the behaviour would would
>> be disabled." - since this is an optional API that source task
>> implementations don't necessarily need to implement, I don't think I fully
>> follow why the return type of the proposed "updateOffsets" method is an
>> Optional? Can we not simply use the Map as the return type instead?
>
>
>
> Yeah, I updated the return type to be a Map.
>
>
> 5) The KIP states that "The offsets passed to the updateOffsets  method
>> would be the offset from the latest source record amongst all source
>> records per partition. This way, if the source offset for a given source
>> partition is updated, that offset is the one that gets committed for the
>> source partition." - we should clarify that the "latest" offset refers to
>> the offsets that are about to be committed, and not the latest offsets
>> returned from SourceTask::poll so far (see related discussion in
>> https://issues.apache.org/jira/browse/KAFKA-15091 and
>> https://issues.apache.org/jira/browse/KAFKA-5716).
>
>
>
> Done
>
>
> 6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
>> Connect since the framework itself does not (and cannot) make any
>> guarantees on the delivery semantics. Depending on the source connector
>> and
>> the source system, both at-least once and at-most once semantics (for
>> example - a source system where reads are destructive) are possible. We
>> should avoid introducing this terminology in the KIP and instead refer to
>> this scenario as exactly-once support being disabled.
>
>
>
> Done
>
>
> 7) Similar to the above point, we should remove the use of the term
>> "Exactly Once Semantics" and instead refer to exactly-once support being
>> enabled since the framework can't guarantee exactly-once semantics for all
>> possible source connectors (for example - a message queue source connector
>> where offsets are essentially managed in the source system via an ack
>> mechanism).
>
>
> Done
>
> 8) In a previous attempt to fix this gap in functionality, a significant
>> concern was raised on offsets ordering guarantees when we retry sending a
>> batch of records (ref -
>> https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
>> look like this KIP addresses that concern either? In the case where
>> exactly-once support is disabled - if we update the committableOffsets
>> with
>> the offsets provided by the task through the new updateOffsets method,
>> these offsets could be committed before older "regular" offsets are
>> committed due to producer retries which could then lead to an
>> inconsistency
>> if the send operation eventually succeeds.
>
>
>
>
> Thanks for bringing this up. I went through the comment shared above. If
> you see the implementation
> that I 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-11 Thread Sagar
Hi Yash/Chris,

Thanks for the feedback! I have updated the KIP with the suggestions
provided. I would also update the PR with the suggestions.

Also, I was hoping that this could make it to the 3.6 release given that it
would benefit source connectors which have some of the problems listed in
the Motivation Section.

Responses Inline:

Yash:

1) In the proposed changes section where you talk about modifying the
> offsets, could you please clarify that tasks shouldn't modify the offsets
> map that is passed as an argument? Currently, the distinction between the
> offsets map passed as an argument and the offsets map that is returned is
> not very clear in numerous places.



Added

2) The default return value of Optional.empty() seems to be fairly
> non-intuitive considering that the return value is supposed to be the
> offsets that are to be committed. Can we consider simply returning the
> offsets argument itself by default instead?



Chris is suggesting returning null for the default case. I am thinking to
make null
as the default return type. If the returned map is null, there won't be any
further
processing otherwise we will contonue with the existing logic.

3) The KIP states that "It is also possible that a task might choose to
> send a tombstone record as an offset. This is not recommended and to
> prevent connectors shooting themselves in the foot due to this" - could you
> please clarify why this is not recommended / supported?



I have added that a better way of doing that would be via KIP-875. Also, I
didn't want to include
any mechamisms for users to meddle with the offsets topic. Allowing
tombstone records via this method
would be akin to publishing tombstone records directly to the offsets topic
which is not recommended
generally.

4) The KIP states that "If a task returns an Optional of a null object or
> an Optional of an empty map, even for such cases the behaviour would would
> be disabled." - since this is an optional API that source task
> implementations don't necessarily need to implement, I don't think I fully
> follow why the return type of the proposed "updateOffsets" method is an
> Optional? Can we not simply use the Map as the return type instead?



Yeah, I updated the return type to be a Map.


5) The KIP states that "The offsets passed to the updateOffsets  method
> would be the offset from the latest source record amongst all source
> records per partition. This way, if the source offset for a given source
> partition is updated, that offset is the one that gets committed for the
> source partition." - we should clarify that the "latest" offset refers to
> the offsets that are about to be committed, and not the latest offsets
> returned from SourceTask::poll so far (see related discussion in
> https://issues.apache.org/jira/browse/KAFKA-15091 and
> https://issues.apache.org/jira/browse/KAFKA-5716).



Done


6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
> Connect since the framework itself does not (and cannot) make any
> guarantees on the delivery semantics. Depending on the source connector and
> the source system, both at-least once and at-most once semantics (for
> example - a source system where reads are destructive) are possible. We
> should avoid introducing this terminology in the KIP and instead refer to
> this scenario as exactly-once support being disabled.



Done


7) Similar to the above point, we should remove the use of the term
> "Exactly Once Semantics" and instead refer to exactly-once support being
> enabled since the framework can't guarantee exactly-once semantics for all
> possible source connectors (for example - a message queue source connector
> where offsets are essentially managed in the source system via an ack
> mechanism).


Done

8) In a previous attempt to fix this gap in functionality, a significant
> concern was raised on offsets ordering guarantees when we retry sending a
> batch of records (ref -
> https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
> look like this KIP addresses that concern either? In the case where
> exactly-once support is disabled - if we update the committableOffsets with
> the offsets provided by the task through the new updateOffsets method,
> these offsets could be committed before older "regular" offsets are
> committed due to producer retries which could then lead to an inconsistency
> if the send operation eventually succeeds.




Thanks for bringing this up. I went through the comment shared above. If
you see the implementation
that I have in the PR, in EOS-disabled case, updateOffsets is invoked only
when toSend is null. Refer
here:
https://github.com/apache/kafka/pull/13899/files#diff-a3107b56382b6ec950dc9d19d21f188c21d4bf41853e0505d60d3bf87adab6a9R324-R330


Which means that we invoke updateOffsets only when
1) Either the last poll invocation didn't return any records or
2) All the records returned by the previous poll invocation got processed
successful

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-05 Thread Chris Egerton
Hi Sagar,

Thanks for updating the KIP! The latest draft seems simpler and more
focused, which I think is a win for users and developers alike. Here are my
thoughts on the current draft:

1. (Nit) Can we move the "Public Interfaces" section before the "Proposed
Changes" section? It's nice to have a summary of the user/developer-facing
changes first since that answers many of the questions that I had while
reading the "Proposed Changes" section. I'd bet that this is also why we
use that ordering in the KIP template.

2. Why are we invoking SourceTask::updateOffsets so frequently when
exactly-once support is disabled? Wouldn't it be simpler both for our
implementation and for connector developers if we only invoked it directly
before committing offsets, instead of potentially several times between
offset commits, especially since that would also mirror the behavior with
exactly-once support enabled?

3. Building off of point 2, we wouldn't need to specify any more detail
than that "SourceTask::updateOffsets will be invoked directly before
committing offsets, with the to-be-committed offsets". There would be no
need to distinguish between when exactly-once support is enabled or
disabled.

4. Some general stylistic feedback: we shouldn't mention the names of
internal classes or methods in KIPs. KIPS are for discussing high-level
design proposals. Internal names and APIS may change over time, and are not
very helpful to readers who are not already familiar with the code base.
Instead, we should describe changes in behavior, not code.

5. Why return a complete map of to-be-committed offsets instead of a map of
just the offsets that the connector wants to change? This seems especially
intuitive since we automatically re-insert source partitions that have been
removed by the connector.

6. I don't think we don't need to return an Optional from
SourceTask::updateOffsets. Developers can return null instead of
Optional.empty(), and since the framework will have to handle null return
values either way, this would reduce the number of cases for us to handle
from three (Optional.of(...), Optional.empty(), null) to two (null,
non-null).

7. Why disallow tombstone records? If an upstream resource disappears, then
wouldn't a task want to emit a tombstone record without having to also emit
an accompanying source record? This could help prevent an
infinitely-growing offsets topic, although with KIP-875 coming out in the
next release, perhaps we can leave this out for now and let Connect users
and cluster administrators do this work manually instead of letting
connector developers automate it.

8. Is the information on multiple offsets topics for exactly-once
connectors relevant to this KIP? If not, we should remove it.

9. It seems like most of the use cases that motivate this KIP only require
being able to add a new source partition/source offset pair to the
to-be-committed offsets. Do we need to allow connector developers to modify
source offsets for already-present source partitions at all? If we reduce
the surface of the API, then the worst case is still just that the offsets
we commit are at most one commit out-of-date.

10. (Nit) The "Motivation" section states that "offsets are written
periodically by the connect framework to an offsets topic". This is only
true in distributed mode; in standalone mode, we write offsets to a local
file.

Cheers,

Chris

On Tue, Jul 4, 2023 at 8:42 AM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for your continued work on this KIP! Here are my thoughts on your
> updated proposal:
>
> 1) In the proposed changes section where you talk about modifying the
> offsets, could you please clarify that tasks shouldn't modify the offsets
> map that is passed as an argument? Currently, the distinction between the
> offsets map passed as an argument and the offsets map that is returned is
> not very clear in numerous places.
>
> 2) The default return value of Optional.empty() seems to be fairly
> non-intuitive considering that the return value is supposed to be the
> offsets that are to be committed. Can we consider simply returning the
> offsets argument itself by default instead?
>
> 3) The KIP states that "It is also possible that a task might choose to
> send a tombstone record as an offset. This is not recommended and to
> prevent connectors shooting themselves in the foot due to this" - could you
> please clarify why this is not recommended / supported?
>
> 4) The KIP states that "If a task returns an Optional of a null object or
> an Optional of an empty map, even for such cases the behaviour would would
> be disabled." - since this is an optional API that source task
> implementations don't necessarily need to implement, I don't think I fully
> follow why the return type of the proposed "updateOffsets" method is an
> Optional? Can we not simply use the Map as the return type instead?
>
> 5) The KIP states that "The offsets passed to the updateOffsets  method
> would be the offset from the late

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-04 Thread Yash Mayya
Hi Sagar,

Thanks for your continued work on this KIP! Here are my thoughts on your
updated proposal:

1) In the proposed changes section where you talk about modifying the
offsets, could you please clarify that tasks shouldn't modify the offsets
map that is passed as an argument? Currently, the distinction between the
offsets map passed as an argument and the offsets map that is returned is
not very clear in numerous places.

2) The default return value of Optional.empty() seems to be fairly
non-intuitive considering that the return value is supposed to be the
offsets that are to be committed. Can we consider simply returning the
offsets argument itself by default instead?

3) The KIP states that "It is also possible that a task might choose to
send a tombstone record as an offset. This is not recommended and to
prevent connectors shooting themselves in the foot due to this" - could you
please clarify why this is not recommended / supported?

4) The KIP states that "If a task returns an Optional of a null object or
an Optional of an empty map, even for such cases the behaviour would would
be disabled." - since this is an optional API that source task
implementations don't necessarily need to implement, I don't think I fully
follow why the return type of the proposed "updateOffsets" method is an
Optional? Can we not simply use the Map as the return type instead?

5) The KIP states that "The offsets passed to the updateOffsets  method
would be the offset from the latest source record amongst all source
records per partition. This way, if the source offset for a given source
partition is updated, that offset is the one that gets committed for the
source partition." - we should clarify that the "latest" offset refers to
the offsets that are about to be committed, and not the latest offsets
returned from SourceTask::poll so far (see related discussion in
https://issues.apache.org/jira/browse/KAFKA-15091 and
https://issues.apache.org/jira/browse/KAFKA-5716).

6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
Connect since the framework itself does not (and cannot) make any
guarantees on the delivery semantics. Depending on the source connector and
the source system, both at-least once and at-most once semantics (for
example - a source system where reads are destructive) are possible. We
should avoid introducing this terminology in the KIP and instead refer to
this scenario as exactly-once support being disabled.

7) Similar to the above point, we should remove the use of the term
"Exactly Once Semantics" and instead refer to exactly-once support being
enabled since the framework can't guarantee exactly-once semantics for all
possible source connectors (for example - a message queue source connector
where offsets are essentially managed in the source system via an ack
mechanism).

8) In a previous attempt to fix this gap in functionality, a significant
concern was raised on offsets ordering guarantees when we retry sending a
batch of records (ref -
https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
look like this KIP addresses that concern either? In the case where
exactly-once support is disabled - if we update the committableOffsets with
the offsets provided by the task through the new updateOffsets method,
these offsets could be committed before older "regular" offsets are
committed due to producer retries which could then lead to an inconsistency
if the send operation eventually succeeds.

9) The KIP states that when exactly-once support is enabled, the new
SourceTask::updateOffsets method will be invoked only when an offset flush
is attempted. If the connector is configured to use a connector specified
transaction boundary rather than a poll or interval based boundary, isn't
it possible that we don't call SourceTask::updateOffsets until there are
actual records that are also being returned through poll (which would
defeat the primary motivation of the KIP)? Or are we making the assumption
that the connector defined transaction boundary should handle this case
appropriately if needed (i.e. source tasks should occasionally request for
a transaction commit via their transaction context if they want offsets to
be committed without producing records)? If so, I think we should
explicitly call that out in the KIP.

10) The Javadoc for SourceTask::updateOffsets in the section on public
interfaces also has the same issue with the definition of latest offsets
that I've mentioned above (latest offsets from poll versus latest offsets
that are about to be committed).

11) The Javadoc for SourceTask::updateOffsets also introduces the same
confusion w.r.t updating offsets that I've mentioned above (modifying the
offsets map argument versus returning a modified copy of the offsets map).

12) In the section on compatibility, we should explicitly mention that
connectors which implement the new method will still be compatible with
older Connect runtimes where the method will simp

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-06-21 Thread Sagar
Hi All,

I have created this PR: https://github.com/apache/kafka/pull/13899 which
implements the approach outlined in the latest version of the KIP. I
thought I could use this to validate the approach based on my understanding
while the KIP itself gets reviewed. I can always change the implementation
once we move to a final decision on the KIP.

Thanks!
Sagar.


On Wed, Jun 14, 2023 at 4:59 PM Sagar  wrote:

> Hey All,
>
> Bumping this discussion thread again to see how the modified KIP looks
> like.
>
> Thanks!
> Sagar.
>
> On Mon, May 29, 2023 at 8:12 PM Sagar  wrote:
>
>> Hi,
>>
>> Bumping this thread again for further reviews.
>>
>> Thanks!
>> Sagar.
>>
>> On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:
>>
>>> Hi All,
>>>
>>> Thanks for the comments/reviews. I have updated the KIP
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>> with a newer approach which shelves the need for an explicit topic.
>>>
>>> Please review again and let me know what you think.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>>
>>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>>>
 Hi Sagar,

 Thanks for the KIP! I have a few questions and comments:

 1) I agree with Chris' point about the separation of a connector
 heartbeat
 mechanism and allowing source connectors to generate offsets without
 producing data. What is the purpose of the heartbeat topic here and are
 there any concrete use cases for downstream consumers on this topic? Why
 can't we instead simply introduce a mechanism to retrieve a list of
 source
 partition / source offset pairs from the source tasks?

 2) With the currently described mechanism, the new
 "SourceTask::produceHeartbeatRecords" method returns a
 "List"
 - what happens with the topic in each of these source records? Chris
 pointed this out above, but it doesn't seem to have been addressed? The
 "SourceRecord" class also has a bunch of other fields which will be
 irrelevant here (partition, key / value schema, key / value data,
 timestamp, headers). In fact, it seems like only the source partition
 and
 source offset are relevant here, so we should either introduce a new
 abstraction or simply use a data structure like a mapping from source
 partitions to source offsets (adds to the above point)?

 3) I'm not sure I fully follow why the heartbeat timer / interval is
 needed? What are the downsides of
 calling "SourceTask::produceHeartbeatRecords" in every execution loop
 (similar to the existing "SourceTask::poll" method)? Is this only to
 prevent the generation of a lot of offset records? Since Connect's
 offsets
 topics are log compacted (and source partitions are used as keys for
 each
 source offset), I'm not sure if such concerns are valid and such a
 heartbeat timer / interval mechanism is required?

 4) The first couple of rejected alternatives state that the use of a
 null
 topic / key / value are preferably avoided - but the current proposal
 would
 also likely require connectors to use such workarounds (null topic when
 the
 heartbeat topic is configured at a worker level and always for the key /
 value)?

 5) The third rejected alternative talks about subclassing the
 "SourceRecord" class - this presumably means allowing connectors to pass
 special offset only records via the existing poll mechanism? Why was
 this
 considered a more invasive option? Was it because of the backward
 compatibility issues that would be introduced for plugins using the new
 public API class that still need to be deployed onto older Connect
 workers?

 Thanks,
 Yash

 On Fri, Apr 14, 2023 at 6:45 PM Sagar 
 wrote:

 > One thing I forgot to mention in my previous email was that the
 reason I
 > chose to include the opt-in behaviour via configs was that the users
 of the
 > connector know their workload patterns. If the workload is such that
 the
 >  connector would receive regular valid updates then there’s ideally
 no need
 > for moving offsets since it would update automatically.
 >
 > This way they aren’t forced to use this feature and can use it only
 when
 > the workload is expected to be batchy or not frequent.
 >
 > Thanks!
 > Sagar.
 >
 >
 > On Fri, 14 Apr 2023 at 5:32 PM, Sagar 
 wrote:
 >
 > > Hi Chris,
 > >
 > > Thanks for following up on the response. Sharing my thoughts
 further:
 > >
 > > If we want to add support for connectors to emit offsets without
 > >> accompanying source records, we could (and IMO should) do that
 without
 > >> requiring users to manually enable that feature by adjusting
 worker or
 > >> connector configurations.
 > >
 > >

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-06-14 Thread Sagar
Hey All,

Bumping this discussion thread again to see how the modified KIP looks
like.

Thanks!
Sagar.

On Mon, May 29, 2023 at 8:12 PM Sagar  wrote:

> Hi,
>
> Bumping this thread again for further reviews.
>
> Thanks!
> Sagar.
>
> On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:
>
>> Hi All,
>>
>> Thanks for the comments/reviews. I have updated the KIP
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> with a newer approach which shelves the need for an explicit topic.
>>
>> Please review again and let me know what you think.
>>
>> Thanks!
>> Sagar.
>>
>>
>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>>
>>> Hi Sagar,
>>>
>>> Thanks for the KIP! I have a few questions and comments:
>>>
>>> 1) I agree with Chris' point about the separation of a connector
>>> heartbeat
>>> mechanism and allowing source connectors to generate offsets without
>>> producing data. What is the purpose of the heartbeat topic here and are
>>> there any concrete use cases for downstream consumers on this topic? Why
>>> can't we instead simply introduce a mechanism to retrieve a list of
>>> source
>>> partition / source offset pairs from the source tasks?
>>>
>>> 2) With the currently described mechanism, the new
>>> "SourceTask::produceHeartbeatRecords" method returns a
>>> "List"
>>> - what happens with the topic in each of these source records? Chris
>>> pointed this out above, but it doesn't seem to have been addressed? The
>>> "SourceRecord" class also has a bunch of other fields which will be
>>> irrelevant here (partition, key / value schema, key / value data,
>>> timestamp, headers). In fact, it seems like only the source partition and
>>> source offset are relevant here, so we should either introduce a new
>>> abstraction or simply use a data structure like a mapping from source
>>> partitions to source offsets (adds to the above point)?
>>>
>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>>> needed? What are the downsides of
>>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>>> (similar to the existing "SourceTask::poll" method)? Is this only to
>>> prevent the generation of a lot of offset records? Since Connect's
>>> offsets
>>> topics are log compacted (and source partitions are used as keys for each
>>> source offset), I'm not sure if such concerns are valid and such a
>>> heartbeat timer / interval mechanism is required?
>>>
>>> 4) The first couple of rejected alternatives state that the use of a null
>>> topic / key / value are preferably avoided - but the current proposal
>>> would
>>> also likely require connectors to use such workarounds (null topic when
>>> the
>>> heartbeat topic is configured at a worker level and always for the key /
>>> value)?
>>>
>>> 5) The third rejected alternative talks about subclassing the
>>> "SourceRecord" class - this presumably means allowing connectors to pass
>>> special offset only records via the existing poll mechanism? Why was this
>>> considered a more invasive option? Was it because of the backward
>>> compatibility issues that would be introduced for plugins using the new
>>> public API class that still need to be deployed onto older Connect
>>> workers?
>>>
>>> Thanks,
>>> Yash
>>>
>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>>>
>>> > One thing I forgot to mention in my previous email was that the reason
>>> I
>>> > chose to include the opt-in behaviour via configs was that the users
>>> of the
>>> > connector know their workload patterns. If the workload is such that
>>> the
>>> >  connector would receive regular valid updates then there’s ideally no
>>> need
>>> > for moving offsets since it would update automatically.
>>> >
>>> > This way they aren’t forced to use this feature and can use it only
>>> when
>>> > the workload is expected to be batchy or not frequent.
>>> >
>>> > Thanks!
>>> > Sagar.
>>> >
>>> >
>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar 
>>> wrote:
>>> >
>>> > > Hi Chris,
>>> > >
>>> > > Thanks for following up on the response. Sharing my thoughts further:
>>> > >
>>> > > If we want to add support for connectors to emit offsets without
>>> > >> accompanying source records, we could (and IMO should) do that
>>> without
>>> > >> requiring users to manually enable that feature by adjusting worker
>>> or
>>> > >> connector configurations.
>>> > >
>>> > >
>>> > > With the current KIP design, I have tried to implement this in an
>>> opt-in
>>> > > manner via configs. I guess what you are trying to say is that this
>>> > doesn't
>>> > > need a config of it's own and instead could be part of the poll ->
>>> > > transform etc -> produce -> commit cycle. That way, the users don't
>>> need
>>> > to
>>> > > set any config and if the connector supports moving offsets w/o
>>> producing
>>> > > SourceRecords, it should happen automatically. Is that correct? If
>>> that
>>> > > is the concern, then I can think of not exposing

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-05-29 Thread Sagar
Hi,

Bumping this thread again for further reviews.

Thanks!
Sagar.

On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:

> Hi All,
>
> Thanks for the comments/reviews. I have updated the KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> with a newer approach which shelves the need for an explicit topic.
>
> Please review again and let me know what you think.
>
> Thanks!
> Sagar.
>
>
> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the KIP! I have a few questions and comments:
>>
>> 1) I agree with Chris' point about the separation of a connector heartbeat
>> mechanism and allowing source connectors to generate offsets without
>> producing data. What is the purpose of the heartbeat topic here and are
>> there any concrete use cases for downstream consumers on this topic? Why
>> can't we instead simply introduce a mechanism to retrieve a list of source
>> partition / source offset pairs from the source tasks?
>>
>> 2) With the currently described mechanism, the new
>> "SourceTask::produceHeartbeatRecords" method returns a
>> "List"
>> - what happens with the topic in each of these source records? Chris
>> pointed this out above, but it doesn't seem to have been addressed? The
>> "SourceRecord" class also has a bunch of other fields which will be
>> irrelevant here (partition, key / value schema, key / value data,
>> timestamp, headers). In fact, it seems like only the source partition and
>> source offset are relevant here, so we should either introduce a new
>> abstraction or simply use a data structure like a mapping from source
>> partitions to source offsets (adds to the above point)?
>>
>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>> needed? What are the downsides of
>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>> (similar to the existing "SourceTask::poll" method)? Is this only to
>> prevent the generation of a lot of offset records? Since Connect's offsets
>> topics are log compacted (and source partitions are used as keys for each
>> source offset), I'm not sure if such concerns are valid and such a
>> heartbeat timer / interval mechanism is required?
>>
>> 4) The first couple of rejected alternatives state that the use of a null
>> topic / key / value are preferably avoided - but the current proposal
>> would
>> also likely require connectors to use such workarounds (null topic when
>> the
>> heartbeat topic is configured at a worker level and always for the key /
>> value)?
>>
>> 5) The third rejected alternative talks about subclassing the
>> "SourceRecord" class - this presumably means allowing connectors to pass
>> special offset only records via the existing poll mechanism? Why was this
>> considered a more invasive option? Was it because of the backward
>> compatibility issues that would be introduced for plugins using the new
>> public API class that still need to be deployed onto older Connect
>> workers?
>>
>> Thanks,
>> Yash
>>
>> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>>
>> > One thing I forgot to mention in my previous email was that the reason I
>> > chose to include the opt-in behaviour via configs was that the users of
>> the
>> > connector know their workload patterns. If the workload is such that the
>> >  connector would receive regular valid updates then there’s ideally no
>> need
>> > for moving offsets since it would update automatically.
>> >
>> > This way they aren’t forced to use this feature and can use it only when
>> > the workload is expected to be batchy or not frequent.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar 
>> wrote:
>> >
>> > > Hi Chris,
>> > >
>> > > Thanks for following up on the response. Sharing my thoughts further:
>> > >
>> > > If we want to add support for connectors to emit offsets without
>> > >> accompanying source records, we could (and IMO should) do that
>> without
>> > >> requiring users to manually enable that feature by adjusting worker
>> or
>> > >> connector configurations.
>> > >
>> > >
>> > > With the current KIP design, I have tried to implement this in an
>> opt-in
>> > > manner via configs. I guess what you are trying to say is that this
>> > doesn't
>> > > need a config of it's own and instead could be part of the poll ->
>> > > transform etc -> produce -> commit cycle. That way, the users don't
>> need
>> > to
>> > > set any config and if the connector supports moving offsets w/o
>> producing
>> > > SourceRecords, it should happen automatically. Is that correct? If
>> that
>> > > is the concern, then I can think of not exposing a config and try to
>> make
>> > > this process automatically. That should ease the load on connector
>> users,
>> > > but your point about cognitive load on Connector developers, I am
>> still
>> > not
>> > > sure how to address that. The offsets are privy to a connector and the
>> > > framework 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-05-12 Thread Sagar
Hi All,

Thanks for the comments/reviews. I have updated the KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
with a newer approach which shelves the need for an explicit topic.

Please review again and let me know what you think.

Thanks!
Sagar.


On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the KIP! I have a few questions and comments:
>
> 1) I agree with Chris' point about the separation of a connector heartbeat
> mechanism and allowing source connectors to generate offsets without
> producing data. What is the purpose of the heartbeat topic here and are
> there any concrete use cases for downstream consumers on this topic? Why
> can't we instead simply introduce a mechanism to retrieve a list of source
> partition / source offset pairs from the source tasks?
>
> 2) With the currently described mechanism, the new
> "SourceTask::produceHeartbeatRecords" method returns a "List"
> - what happens with the topic in each of these source records? Chris
> pointed this out above, but it doesn't seem to have been addressed? The
> "SourceRecord" class also has a bunch of other fields which will be
> irrelevant here (partition, key / value schema, key / value data,
> timestamp, headers). In fact, it seems like only the source partition and
> source offset are relevant here, so we should either introduce a new
> abstraction or simply use a data structure like a mapping from source
> partitions to source offsets (adds to the above point)?
>
> 3) I'm not sure I fully follow why the heartbeat timer / interval is
> needed? What are the downsides of
> calling "SourceTask::produceHeartbeatRecords" in every execution loop
> (similar to the existing "SourceTask::poll" method)? Is this only to
> prevent the generation of a lot of offset records? Since Connect's offsets
> topics are log compacted (and source partitions are used as keys for each
> source offset), I'm not sure if such concerns are valid and such a
> heartbeat timer / interval mechanism is required?
>
> 4) The first couple of rejected alternatives state that the use of a null
> topic / key / value are preferably avoided - but the current proposal would
> also likely require connectors to use such workarounds (null topic when the
> heartbeat topic is configured at a worker level and always for the key /
> value)?
>
> 5) The third rejected alternative talks about subclassing the
> "SourceRecord" class - this presumably means allowing connectors to pass
> special offset only records via the existing poll mechanism? Why was this
> considered a more invasive option? Was it because of the backward
> compatibility issues that would be introduced for plugins using the new
> public API class that still need to be deployed onto older Connect workers?
>
> Thanks,
> Yash
>
> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>
> > One thing I forgot to mention in my previous email was that the reason I
> > chose to include the opt-in behaviour via configs was that the users of
> the
> > connector know their workload patterns. If the workload is such that the
> >  connector would receive regular valid updates then there’s ideally no
> need
> > for moving offsets since it would update automatically.
> >
> > This way they aren’t forced to use this feature and can use it only when
> > the workload is expected to be batchy or not frequent.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar  wrote:
> >
> > > Hi Chris,
> > >
> > > Thanks for following up on the response. Sharing my thoughts further:
> > >
> > > If we want to add support for connectors to emit offsets without
> > >> accompanying source records, we could (and IMO should) do that without
> > >> requiring users to manually enable that feature by adjusting worker or
> > >> connector configurations.
> > >
> > >
> > > With the current KIP design, I have tried to implement this in an
> opt-in
> > > manner via configs. I guess what you are trying to say is that this
> > doesn't
> > > need a config of it's own and instead could be part of the poll ->
> > > transform etc -> produce -> commit cycle. That way, the users don't
> need
> > to
> > > set any config and if the connector supports moving offsets w/o
> producing
> > > SourceRecords, it should happen automatically. Is that correct? If that
> > > is the concern, then I can think of not exposing a config and try to
> make
> > > this process automatically. That should ease the load on connector
> users,
> > > but your point about cognitive load on Connector developers, I am still
> > not
> > > sure how to address that. The offsets are privy to a connector and the
> > > framework at best can provide hooks to the tasks to update their
> offsets.
> > > Connector developers would still have to consider all cases before
> > updating
> > > offsets.  And if I ignore the heartbeat topic and heartbeat interval ms
> > > configs, then what the K

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-24 Thread Yash Mayya
Hi Sagar,

Thanks for the KIP! I have a few questions and comments:

1) I agree with Chris' point about the separation of a connector heartbeat
mechanism and allowing source connectors to generate offsets without
producing data. What is the purpose of the heartbeat topic here and are
there any concrete use cases for downstream consumers on this topic? Why
can't we instead simply introduce a mechanism to retrieve a list of source
partition / source offset pairs from the source tasks?

2) With the currently described mechanism, the new
"SourceTask::produceHeartbeatRecords" method returns a "List"
- what happens with the topic in each of these source records? Chris
pointed this out above, but it doesn't seem to have been addressed? The
"SourceRecord" class also has a bunch of other fields which will be
irrelevant here (partition, key / value schema, key / value data,
timestamp, headers). In fact, it seems like only the source partition and
source offset are relevant here, so we should either introduce a new
abstraction or simply use a data structure like a mapping from source
partitions to source offsets (adds to the above point)?

3) I'm not sure I fully follow why the heartbeat timer / interval is
needed? What are the downsides of
calling "SourceTask::produceHeartbeatRecords" in every execution loop
(similar to the existing "SourceTask::poll" method)? Is this only to
prevent the generation of a lot of offset records? Since Connect's offsets
topics are log compacted (and source partitions are used as keys for each
source offset), I'm not sure if such concerns are valid and such a
heartbeat timer / interval mechanism is required?

4) The first couple of rejected alternatives state that the use of a null
topic / key / value are preferably avoided - but the current proposal would
also likely require connectors to use such workarounds (null topic when the
heartbeat topic is configured at a worker level and always for the key /
value)?

5) The third rejected alternative talks about subclassing the
"SourceRecord" class - this presumably means allowing connectors to pass
special offset only records via the existing poll mechanism? Why was this
considered a more invasive option? Was it because of the backward
compatibility issues that would be introduced for plugins using the new
public API class that still need to be deployed onto older Connect workers?

Thanks,
Yash

On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:

> One thing I forgot to mention in my previous email was that the reason I
> chose to include the opt-in behaviour via configs was that the users of the
> connector know their workload patterns. If the workload is such that the
>  connector would receive regular valid updates then there’s ideally no need
> for moving offsets since it would update automatically.
>
> This way they aren’t forced to use this feature and can use it only when
> the workload is expected to be batchy or not frequent.
>
> Thanks!
> Sagar.
>
>
> On Fri, 14 Apr 2023 at 5:32 PM, Sagar  wrote:
>
> > Hi Chris,
> >
> > Thanks for following up on the response. Sharing my thoughts further:
> >
> > If we want to add support for connectors to emit offsets without
> >> accompanying source records, we could (and IMO should) do that without
> >> requiring users to manually enable that feature by adjusting worker or
> >> connector configurations.
> >
> >
> > With the current KIP design, I have tried to implement this in an opt-in
> > manner via configs. I guess what you are trying to say is that this
> doesn't
> > need a config of it's own and instead could be part of the poll ->
> > transform etc -> produce -> commit cycle. That way, the users don't need
> to
> > set any config and if the connector supports moving offsets w/o producing
> > SourceRecords, it should happen automatically. Is that correct? If that
> > is the concern, then I can think of not exposing a config and try to make
> > this process automatically. That should ease the load on connector users,
> > but your point about cognitive load on Connector developers, I am still
> not
> > sure how to address that. The offsets are privy to a connector and the
> > framework at best can provide hooks to the tasks to update their offsets.
> > Connector developers would still have to consider all cases before
> updating
> > offsets.  And if I ignore the heartbeat topic and heartbeat interval ms
> > configs, then what the KIP proposes currently isn't much different in
> that
> > regard. Just that it produces a List of SourceRecord which can be changed
> > to a Map of SourcePartition and their offsets if you think that would
> > simplify things. Are there other cases in your mind which need
> addressing?
> >
> > Here's my take on the usecases:
> >
> >1. Regarding the example about SMTs with Object Storage based
> >connectors, it was one of the scenarios identified. We have some
> connectors
> >that rely on the offsets topic to check if the next batch of files
> should
> >be p

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-14 Thread Sagar
One thing I forgot to mention in my previous email was that the reason I
chose to include the opt-in behaviour via configs was that the users of the
connector know their workload patterns. If the workload is such that the
 connector would receive regular valid updates then there’s ideally no need
for moving offsets since it would update automatically.

This way they aren’t forced to use this feature and can use it only when
the workload is expected to be batchy or not frequent.

Thanks!
Sagar.


On Fri, 14 Apr 2023 at 5:32 PM, Sagar  wrote:

> Hi Chris,
>
> Thanks for following up on the response. Sharing my thoughts further:
>
> If we want to add support for connectors to emit offsets without
>> accompanying source records, we could (and IMO should) do that without
>> requiring users to manually enable that feature by adjusting worker or
>> connector configurations.
>
>
> With the current KIP design, I have tried to implement this in an opt-in
> manner via configs. I guess what you are trying to say is that this doesn't
> need a config of it's own and instead could be part of the poll ->
> transform etc -> produce -> commit cycle. That way, the users don't need to
> set any config and if the connector supports moving offsets w/o producing
> SourceRecords, it should happen automatically. Is that correct? If that
> is the concern, then I can think of not exposing a config and try to make
> this process automatically. That should ease the load on connector users,
> but your point about cognitive load on Connector developers, I am still not
> sure how to address that. The offsets are privy to a connector and the
> framework at best can provide hooks to the tasks to update their offsets.
> Connector developers would still have to consider all cases before updating
> offsets.  And if I ignore the heartbeat topic and heartbeat interval ms
> configs, then what the KIP proposes currently isn't much different in that
> regard. Just that it produces a List of SourceRecord which can be changed
> to a Map of SourcePartition and their offsets if you think that would
> simplify things. Are there other cases in your mind which need addressing?
>
> Here's my take on the usecases:
>
>1. Regarding the example about SMTs with Object Storage based
>connectors, it was one of the scenarios identified. We have some connectors
>that rely on the offsets topic to check if the next batch of files should
>be processed and because of filtering of the last record from the files,
>the eof supposedly is  never reached and the connector can't commit offsets
>for that source partition(file). If there was a mechanism to update offsets
>for such a source file, then with some moderately complex state tracking,
>the connector can mark that file as processed and proceed.
>2. There's another use case with the same class of connectors where if
>a file is malformed, then the connector couldn't produce any offsets
>because the file couldn't get processed completely. To handle such cases,
>the connector developers have introduced a dev/null sort of topic where
>they produce a record to this corrupted file topic and move the offset
>somehow. This topic ideally isn't needed and with a mechanism to update
>offsets would have helped in this case as well.
>3. Coming to CDC based connectors,
>   1. We had a similar issue with Oracle CDC source connector and
>   needed to employ the same heartbeat mechanism to get around it.
>   2. MongoDB CDC source Connector  has employed the same heartbeat
>   mechanism Check `heartbeat.interval.ms` here (
>   
> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
>   ).
>   3. Another CDC connector for ScyllaDB employs a similar mechanism.
>   
> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
>   4. For CDC based connectors, you could argue that these connectors
>   have been able to solve this error then why do we need framework level
>   support. But the point I am trying to make is that this limitation from 
> the
>   framework is forcing CDC connector developers to implement per-connector
>   solutions/hacks(at times). And there could always be more CDC 
> connectors in
>   the pipeline forcing them to take a similar route as well.
>4. There's also a case at times with CDC source connectors which are
>REST Api / Web Service based(Zendesk Source Connector for example) . These
>connectors typically use timestamps from the responses as offsets. If
>there's a long period of inactivity wherein the API invocations don't
>return any data, then the offsets won't move and the connector would keep
>using the same timestamp that it received from the last non-empty response.
>If this period of inactivity keeps growing, and the API imposes any limits
>on how far back we can go in terms of wind

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-14 Thread Sagar
Hi Chris,

Thanks for following up on the response. Sharing my thoughts further:

If we want to add support for connectors to emit offsets without
> accompanying source records, we could (and IMO should) do that without
> requiring users to manually enable that feature by adjusting worker or
> connector configurations.


With the current KIP design, I have tried to implement this in an opt-in
manner via configs. I guess what you are trying to say is that this doesn't
need a config of it's own and instead could be part of the poll ->
transform etc -> produce -> commit cycle. That way, the users don't need to
set any config and if the connector supports moving offsets w/o producing
SourceRecords, it should happen automatically. Is that correct? If that
is the concern, then I can think of not exposing a config and try to make
this process automatically. That should ease the load on connector users,
but your point about cognitive load on Connector developers, I am still not
sure how to address that. The offsets are privy to a connector and the
framework at best can provide hooks to the tasks to update their offsets.
Connector developers would still have to consider all cases before updating
offsets.  And if I ignore the heartbeat topic and heartbeat interval ms
configs, then what the KIP proposes currently isn't much different in that
regard. Just that it produces a List of SourceRecord which can be changed
to a Map of SourcePartition and their offsets if you think that would
simplify things. Are there other cases in your mind which need addressing?

Here's my take on the usecases:

   1. Regarding the example about SMTs with Object Storage based
   connectors, it was one of the scenarios identified. We have some connectors
   that rely on the offsets topic to check if the next batch of files should
   be processed and because of filtering of the last record from the files,
   the eof supposedly is  never reached and the connector can't commit offsets
   for that source partition(file). If there was a mechanism to update offsets
   for such a source file, then with some moderately complex state tracking,
   the connector can mark that file as processed and proceed.
   2. There's another use case with the same class of connectors where if a
   file is malformed, then the connector couldn't produce any offsets because
   the file couldn't get processed completely. To handle such cases, the
   connector developers have introduced a dev/null sort of topic where they
   produce a record to this corrupted file topic and move the offset somehow.
   This topic ideally isn't needed and with a mechanism to update offsets
   would have helped in this case as well.
   3. Coming to CDC based connectors,
  1. We had a similar issue with Oracle CDC source connector and needed
  to employ the same heartbeat mechanism to get around it.
  2. MongoDB CDC source Connector  has employed the same heartbeat
  mechanism Check `heartbeat.interval.ms` here (
  
https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
  ).
  3. Another CDC connector for ScyllaDB employs a similar mechanism.
  https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
  4. For CDC based connectors, you could argue that these connectors
  have been able to solve this error then why do we need framework level
  support. But the point I am trying to make is that this
limitation from the
  framework is forcing CDC connector developers to implement per-connector
  solutions/hacks(at times). And there could always be more CDC
connectors in
  the pipeline forcing them to take a similar route as well.
   4. There's also a case at times with CDC source connectors which are
   REST Api / Web Service based(Zendesk Source Connector for example) . These
   connectors typically use timestamps from the responses as offsets. If
   there's a long period of inactivity wherein the API invocations don't
   return any data, then the offsets won't move and the connector would keep
   using the same timestamp that it received from the last non-empty response.
   If this period of inactivity keeps growing, and the API imposes any limits
   on how far back we can go in terms of window start, then this could
   potentially be a problem. In this case even though the connector was caught
   up with all the responses, it may need to snapshot again. In this case
   updating offsets can easily help since all the connector needs to do is to
   move the timestamp which would move the offset inherently.

I still believe that this is something the framework should support OOB
irrespective of whether the connectors have been able to get around this
restriction or not.

Lastly, about your comments here:

I'm also not sure that it's worth preserving the current behavior that
> offsets for records that have been filtered out via SMT are not committed.


Let me know if 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-12 Thread Chris Egerton
Hi Sagar,

I'm sorry, I'm still not convinced that this design solves the problem(s)
it sets out to solve in the best way possible. I tried to highlight this in
my last email:

> In general, it seems like we're trying to solve two completely different
problems with this single KIP: adding framework-level support for emitting
heartbeat records for source connectors, and allowing source connectors to
emit offsets without also emitting source records. I don't mind addressing
the two at the same time if the result is elegant and doesn't compromise on
the solution for either problem, but that doesn't seem to be the case here.
Of the two problems, could we describe one as the primary and one as the
secondary? If so, we might consider dropping the secondary problem from
this KIP and addressing it separately.

If we wanted to add support for heartbeat records, we could (and IMO
should) do that without requiring connectors to implement any new methods
and only require adjustments to worker or connector configurations by users
in order to enable that feature.

If we want to add support for connectors to emit offsets without
accompanying source records, we could (and IMO should) do that without
requiring users to manually enable that feature by adjusting worker or
connector configurations.


I'm also not sure that it's worth preserving the current behavior that
offsets for records that have been filtered out via SMT are not committed.
I can't think of a case where this would be useful and there are obviously
plenty where it isn't. There's also a slight discrepancy in how these kinds
of records are treated by the Connect runtime now; if a record is dropped
because of an SMT, then its offset isn't committed, but if it's dropped
because exactly-once support is enabled and the connector chose to abort
the batch containing the record, then its offset is still committed. After
thinking carefully about the aborted transaction behavior, we realized that
it was fine to commit the offsets for those records, and I believe that the
same logic can be applied to any record that we're done trying to send to
Kafka (regardless of whether it was sent correctly, dropped due to producer
error, filtered via SMT, etc.).

I also find the file-based source connector example a little confusing.
What about that kind of connector causes the offset for the last record of
a file to be treated differently? Is there anything different about
filtering that record via SMT vs. dropping it altogether because of an
asynchronous producer error with "errors.tolerance" set to "all"? And
finally, how would such a connector use the design proposed here?

Finally, I don't disagree that if there are other legitimate use cases that
would be helped by addressing KAFKA-3821, we should try to solve that issue
in the Kafka Connect framework instead of requiring individual connectors
to implement their own solutions. But the cognitive load added by the
design proposed here, for connector developers and Connect cluster
administrators alike, costs too much to justify by pointing to an
already-solved problem encountered by a single group of connectors (i.e.,
Debezium). This is why I think it's crucial that we identify realistic
cases where this feature would actually be useful, and right now, I don't
think any have been provided (at least, not ones that have already been
addressed or could be addressed with much simpler changes).

Cheers,

Chris

On Tue, Apr 11, 2023 at 7:30 AM Sagar  wrote:

> Hi Chris,
>
> Thanks for your detailed feedback!
>
> nits: I have taken care of them now. Thanks for pointing those out.
>
> non-nits:
>
> 6) It seems (based on both the KIP and discussion on KAFKA-3821) that the
> > only use case for being able to emit offsets without also emitting source
> > records that's been identified so far is for CDC source connectors like
> > Debezium.
>
>
> I am aware of atleast one more case where the non production of offsets
> (due to non production of records ) leads to the failure of connectors when
> the source purges the records of interest. This happens in File based
> source connectors  (like s3/blob storage ) in which if the last record from
> a file is fiterterd due to an SMT, then that particular file is never
> committed to the source partition and eventually when the file is deleted
> from the source and the connector is restarted due to some reason, it
> fails.
> Moreover, I feel the reason this support should be there in the Kafka
> Connect framework is because this is a restriction of the framework and
> today the framework provides no support for getting around this limitation.
> Every connector has it's own way of handling offsets and having each
> connector handle this restriction in its own way can make it complex.
> Whether we choose to do it the way this KIP prescribes or any other way is
> up for debate but IMHO, the framework should provide a way of
> getting around this limitation.
>
> 7. If a task produces heartbeat records 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-11 Thread Sagar
Hi Chris,

Thanks for your detailed feedback!

nits: I have taken care of them now. Thanks for pointing those out.

non-nits:

6) It seems (based on both the KIP and discussion on KAFKA-3821) that the
> only use case for being able to emit offsets without also emitting source
> records that's been identified so far is for CDC source connectors like
> Debezium.


I am aware of atleast one more case where the non production of offsets
(due to non production of records ) leads to the failure of connectors when
the source purges the records of interest. This happens in File based
source connectors  (like s3/blob storage ) in which if the last record from
a file is fiterterd due to an SMT, then that particular file is never
committed to the source partition and eventually when the file is deleted
from the source and the connector is restarted due to some reason, it fails.
Moreover, I feel the reason this support should be there in the Kafka
Connect framework is because this is a restriction of the framework and
today the framework provides no support for getting around this limitation.
Every connector has it's own way of handling offsets and having each
connector handle this restriction in its own way can make it complex.
Whether we choose to do it the way this KIP prescribes or any other way is
up for debate but IMHO, the framework should provide a way of
getting around this limitation.

7. If a task produces heartbeat records and source records that use the
> same source partition, which offset will ultimately be committed?


The idea is to add the records returned by the `produceHeartbeatRecords`
to  the same `toSend` list within `AbstractWorkerSourceTask#execute`. The
`produceHeartbeatRecords` would be invoked before we make the `poll` call.
Hence, the offsets committed would be in the same order in which they would
be written. Note that, the onus is on the Connector implementation to not
return records which can lead to data loss or data going out of order. The
framework would just commit based on whatever is supplied. Also, AFAIK, 2
`normal` source records can also produce the same source partitions and
they are committed in the order in which they are written.

8. The SourceTask::produceHeartbeatRecords method returns a
> List, and users can control the heartbeat topic for a
> connector via the (connector- or worker-level) "heartbeat.records.topic"
> property. Since every constructor for the SourceRecord class [2] requires a
> topic to be supplied, what will happen to that topic? Will it be ignored?
> If so, I think we should look for a cleaner solution.


Sorry, I couldn't quite follow which topic will be ignored in this case.

9. A large concern raised in the discussion for KAFKA-3821 was the allowing
> connectors to control the ordering of these special "offsets-only"
> emissions and the regular source records returned from SourceTask::poll.
> Are we choosing to ignore that concern? If so, can you add this to the
> rejected alternatives section along with a rationale?


One thing to note is that the for every connector, the condition to emit
the heartbeat record is totally up to the connector, For example, for a
connector which is tracking transactions for an ordered log, if there are
open transactions, it might not need to emit heartbeat records when the
timer expires while for file based connectors, if the same file is being
processed again and again due to an SMT or some other reasons, then it can
choose to emit that partition. The uber point here is that every connector
has it's own requirements and the framework can't really make an assumption
about it. What the KIP is trying to do is to provide a mechanism to the
connector to commit new offsets. With this approach, as far as I can think
so far, there doesn't seem to be a case of out of order processing. If you
have other concerns/thoughts I would be happy to know them.

10. If, sometime in the future, we wanted to add framework-level support
> for sending heartbeat records that doesn't require connectors to implement
> any new APIs...


The main purpose of producing heartbeat records is to be able to emit
offsets w/o any new records. We are using heartbeat records to solve the
primary concern of offsets getting stalled. The reason to do that was once
we get SourceRecords, then the rest of the code is already in place to
write it to a topic of interest and commit offsets and that seemed the most
non invasive in terms of framework level changes. If in the future we want
to do a framework-only heartbeat record support, then this would create
confusion as you pointed out. Do you think the choice of the name heartbeat
records is creating confusion in this case? Maybe we can call these special
records something else (not sure what at this point) which would then
decouple the 2 logically and implementation wise as well?

Thanks!
Sagar.

On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton 
wrote:

> Hi Sagar,
>
> Thanks for the KIP! I have some thoughts.
>
> 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-28 Thread Chris Egerton
Hi Sagar,

Thanks for the KIP! I have some thoughts.

Nits:

1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on the KIP? Or is
there a different ticket that should be associated with it?
2. The current state is listed as "Draft". Considering it's been brought up
for discussion, maybe the KIP should be updated to "Discussion"?
3. Can you add a link for the discussion thread to the KIP?
4. The KIP states that "In this process, offsets are written at regular
intervals(driven by `offset.flush.interval.ms`)". This isn't strictly
accurate since, when exactly-once support is enabled, offset commits can
also be performed for each record batch (which is the default) or when
explicitly requested by the task instance (if the connector implements the
API to define its own transactions and the user has configured it to do
so). Maybe better to just say "Offsets are written periodically"?
5. The description for the (per-connector) "heartbeat.records.topic "
property states that it is "Only applicable in distributed mode; in
standalone mode, setting this property will have no effect". Is this
correct?

Non-nits:

6. It seems (based on both the KIP and discussion on KAFKA-3821) that the
only use case for being able to emit offsets without also emitting source
records that's been identified so far is for CDC source connectors like
Debezium. But Debezium already has support for this exact feature (emitting
heartbeat records that include offsets that cannot be associated with
other, "regular" source records). Why should we add this feature to Kafka
Connect when the problem it addresses is already solved in the set
connectors that (it seems) would have any need for it, and the size of that
set is extremely small? If there are other practical use cases for
connectors that would benefit from this feature, please let me know.

7. If a task produces heartbeat records and source records that use the
same source partition, which offset will ultimately be committed?

8. The SourceTask::produceHeartbeatRecords method returns a
List, and users can control the heartbeat topic for a
connector via the (connector- or worker-level) "heartbeat.records.topic"
property. Since every constructor for the SourceRecord class [2] requires a
topic to be supplied, what will happen to that topic? Will it be ignored?
If so, I think we should look for a cleaner solution.

9. A large concern raised in the discussion for KAFKA-3821 was the allowing
connectors to control the ordering of these special "offsets-only"
emissions and the regular source records returned from SourceTask::poll.
Are we choosing to ignore that concern? If so, can you add this to the
rejected alternatives section along with a rationale?

10. If, sometime in the future, we wanted to add framework-level support
for sending heartbeat records that doesn't require connectors to implement
any new APIs (e.g., SourceTask::produceHeartbeatRecords), a lot of this
would paint us into a corner design-wise. We'd have to think carefully
about which property names would be used, how to account for connectors
that have already implemented the SourceTask::produceHeartbeatRecords
method, etc. In general, it seems like we're trying to solve two completely
different problems with this single KIP: adding framework-level support for
emitting heartbeat records for source connectors, and allowing source
connectors to emit offsets without also emitting source records. I don't
mind addressing the two at the same time if the result is elegant and
doesn't compromise on the solution for either problem, but that doesn't
seem to be the case here. Of the two problems, could we describe one as the
primary and one as the secondary? If so, we might consider dropping the
secondary problm from this KIP and addressing it separately.

[1] - https://issues.apache.org/jira/browse/KAFKA-3821
[2] -
https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html

Cheers,

Chris

On Sat, Mar 25, 2023 at 11:18 PM Sagar  wrote:

> Hi John,
>
> Thanks for taking. look at the KIP!
>
> The point about stream time not advancing in case of infrequent updates is
> an interesting one. I can imagine if the upstream producer to a Kafka
> Streams application is a Source Connector which isn't sending records
> frequently(due to the nature of the data ingestion for example), then the
> downstream stream processing can land into the issues you described above.
>
> Which also brings me to the second point you made about how this would be
> used by downstream consumers. IIUC, you are referring to the consumers of
> the newly added topic i.e the heartbeat topic. In my mind, the heartbeat
> topic is an internal topic (similar to offsets/config/status topic in
> connect), the main purpose of which is to trick the framework to produce
> records to the offsets topic and advance the offsets. Since every connector
> could have a different definition of offsets(LSN, BinLogID etc for
> example), that logic to determine what

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-25 Thread Sagar
Hi John,

Thanks for taking. look at the KIP!

The point about stream time not advancing in case of infrequent updates is
an interesting one. I can imagine if the upstream producer to a Kafka
Streams application is a Source Connector which isn't sending records
frequently(due to the nature of the data ingestion for example), then the
downstream stream processing can land into the issues you described above.

Which also brings me to the second point you made about how this would be
used by downstream consumers. IIUC, you are referring to the consumers of
the newly added topic i.e the heartbeat topic. In my mind, the heartbeat
topic is an internal topic (similar to offsets/config/status topic in
connect), the main purpose of which is to trick the framework to produce
records to the offsets topic and advance the offsets. Since every connector
could have a different definition of offsets(LSN, BinLogID etc for
example), that logic to determine what the heartbeat records should be
would have to reside in the actual connector.

Now that I think of it, it could very well be consumed by downstream
consumers/ Streams or Flink Applications and be further used for some
decision making. A very crude example could be let's say if the heartbeat
records sent to the new heartbeat topic include timestamps, then the
downstream streams application can use that timestamp to close any time
windows. Having said that, it still appears to me that it's outside the
scope of the Connect framework and is something which is difficult to
generalise because of the variety of Sources and the definitions of offsets.

But, I would still be more than happy to add this example if you think it
can be useful in getting a better understanding of the idea and also its
utility beyond connect. Please let me know!

Thanks!
Sagar.


On Fri, Mar 24, 2023 at 7:22 PM John Roesler  wrote:

> Thanks for the KIP, Sagar!
>
> At first glance, this seems like a very useful feature.
>
> A common pain point in Streams is when upstream producers don't send
> regular updates and stream time cannot advance. This causes
> stream-time-driven operations to appear to hang, like time windows not
> closing, suppressions not firing, etc.
>
> From your KIP, I have a good idea of how the feature would be integrated
> into connect, and it sounds good to me. I don't quite see how downstream
> clients, such as a downstream Streams or Flink application, or users of the
> Consumer would make use of this feature. Could you add some examples of
> that nature?
>
> Thank you,
> -John
>
> On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> > Hi All,
> >
> > Bumping the thread again.
> >
> > Sagar.
> >
> >
> > On Fri, Mar 10, 2023 at 4:42 PM Sagar  wrote:
> >
> >> Hi All,
> >>
> >> Bumping this discussion thread again.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:
> >>
> >>> Hi All,
> >>>
> >>> I wanted to create a discussion thread for KIP-910:
> >>>
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-24 Thread John Roesler
Thanks for the KIP, Sagar!

At first glance, this seems like a very useful feature.

A common pain point in Streams is when upstream producers don't send regular 
updates and stream time cannot advance. This causes stream-time-driven 
operations to appear to hang, like time windows not closing, suppressions not 
firing, etc.

>From your KIP, I have a good idea of how the feature would be integrated into 
>connect, and it sounds good to me. I don't quite see how downstream clients, 
>such as a downstream Streams or Flink application, or users of the Consumer 
>would make use of this feature. Could you add some examples of that nature?

Thank you,
-John

On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> Hi All,
>
> Bumping the thread again.
>
> Sagar.
>
>
> On Fri, Mar 10, 2023 at 4:42 PM Sagar  wrote:
>
>> Hi All,
>>
>> Bumping this discussion thread again.
>>
>> Thanks!
>> Sagar.
>>
>> On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:
>>
>>> Hi All,
>>>
>>> I wanted to create a discussion thread for KIP-910:
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>>
>>> Thanks!
>>> Sagar.
>>>
>>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-24 Thread Sagar
Hi All,

Bumping the thread again.

Sagar.


On Fri, Mar 10, 2023 at 4:42 PM Sagar  wrote:

> Hi All,
>
> Bumping this discussion thread again.
>
> Thanks!
> Sagar.
>
> On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:
>
>> Hi All,
>>
>> I wanted to create a discussion thread for KIP-910:
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>
>> Thanks!
>> Sagar.
>>
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-10 Thread Sagar
Hi All,

Bumping this discussion thread again.

Thanks!
Sagar.

On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:

> Hi All,
>
> I wanted to create a discussion thread for KIP-910:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>
> Thanks!
> Sagar.
>


[DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-02 Thread Sagar
Hi All,

I wanted to create a discussion thread for KIP-910:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records

Thanks!
Sagar.