[jira] [Created] (KAFKA-16706) Refactor ReplicationQuotaManager/RLMQuotaManager to eliminate code duplication

2024-05-12 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-16706:
--

 Summary: Refactor ReplicationQuotaManager/RLMQuotaManager to 
eliminate code duplication
 Key: KAFKA-16706
 URL: https://issues.apache.org/jira/browse/KAFKA-16706
 Project: Kafka
  Issue Type: Task
Reporter: Abhijeet Kumar


ReplicationQuotaManager and RLMQuotaManager implementations are similar. We 
should explore ways to refactor them to remove code duplication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1023: Follower fetch from tiered offset

2024-04-28 Thread Abhijeet Kumar
Hi All,

This KIP is accepted with 3 +1 binding votes(Luke, Christo, Jun, Satish)
and 2 +1 non-binding votes(Kamal, Omnia).

Thank you all for voting.

Abhijeet.



On Sat, Apr 27, 2024 at 5:50 AM Satish Duggana 
wrote:

> Thanks Abhijeet for the KIP.
> +1 from me.
>
> ~Satish
>
> On Fri, 26 Apr 2024 at 8:35 PM, Jun Rao  wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the KIP. +1
> >
> > Jun
> >
> > On Thu, Apr 25, 2024 at 10:30 PM Abhijeet Kumar <
> > abhijeet.cse@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I would like to start the vote for KIP-1023 - Follower fetch from
> tiered
> > > offset
> > >
> > > The KIP is here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset
> > >
> > > Regards.
> > > Abhijeet.
> > >
> >
>


[VOTE] KIP-1023: Follower fetch from tiered offset

2024-04-25 Thread Abhijeet Kumar
Hi All,

I would like to start the vote for KIP-1023 - Follower fetch from tiered
offset

The KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset

Regards.
Abhijeet.


Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-25 Thread Abhijeet Kumar
Thank you all for your comments. As all the comments in the thread are
addressed, I am starting a Vote thread for the KIP. Please have a look.

Regards.
Abhijeet.



On Thu, Apr 25, 2024 at 6:08 PM Luke Chen  wrote:

> Hi, Abhijeet,
>
> Thanks for the update.
>
> I have no more comments.
>
> Luke
>
> On Thu, Apr 25, 2024 at 4:21 AM Jun Rao  wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the updated KIP. It looks good to me.
> >
> > Jun
> >
> > On Mon, Apr 22, 2024 at 12:08 PM Abhijeet Kumar <
> > abhijeet.cse@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Please find my comments inline.
> > >
> > >
> > > On Thu, Apr 18, 2024 at 3:26 AM Jun Rao 
> > wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 1. I am wondering if we could achieve the same result by just
> lowering
> > > > local.retention.ms and local.retention.bytes. This also allows the
> > newly
> > > > started follower to build up the local data before serving the
> consumer
> > > > traffic.
> > > >
> > >
> > > I am not sure I fully followed this. Do you mean we could lower the
> > > local.retention (by size and time)
> > > so that there is little data on the leader's local storage so that the
> > > follower can quickly catch up with the leader?
> > >
> > > In that case, we will need to set small local retention across brokers
> in
> > > the cluster. It will have the undesired
> > > effect where there will be increased remote log fetches for serving
> > consume
> > > requests, and this can cause
> > > degradations. Also, this behaviour (of increased remote fetches) will
> > > happen on all brokers at all times, whereas in
> > > the KIP we are restricting the behavior only to the newly bootstrapped
> > > brokers and only until the time it fully builds
> > > the local logs as per retention defined at the cluster level.
> > > (Deprioritization of the broker could help reduce the impact
> > >  even further)
> > >
> > >
> > > >
> > > > 2. Have you updated the KIP?
> > > >
> > >
> > > The KIP has been updated now.
> > >
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Apr 9, 2024 at 3:36 AM Satish Duggana <
> > satish.dugg...@gmail.com>
> > > > wrote:
> > > >
> > > > > +1 to Jun for adding the consumer fetching from a follower scenario
> > > > > also to the existing section that talked about the drawback when a
> > > > > node built with last-tiered-offset has become a leader. As Abhijeet
> > > > > mentioned, we plan to have a follow-up KIP that will address these
> by
> > > > > having a deprioritzation of these brokers. The deprioritization of
> > > > > those brokers can be removed once they catchup until the local log
> > > > > retention.
> > > > >
> > > > > Thanks,
> > > > > Satish.
> > > > >
> > > > > On Tue, 9 Apr 2024 at 14:08, Luke Chen  wrote:
> > > > > >
> > > > > > Hi Abhijeet,
> > > > > >
> > > > > > Thanks for the KIP to improve the tiered storage feature!
> > > > > >
> > > > > > Questions:
> > > > > > 1. We could also get the "pending-upload-offset" and epoch via
> > remote
> > > > log
> > > > > > metadata, instead of adding a new API to fetch from the leader.
> > Could
> > > > you
> > > > > > explain why you choose the later approach, instead of the former?
> > > > > > 2.
> > > > > > > We plan to have a follow-up KIP that will address both the
> > > > > > deprioritization
> > > > > > of these brokers from leadership, as well as
> > > > > > for consumption (when fetching from followers is allowed).
> > > > > >
> > > > > > I agree with Jun that we might need to make it clear all possible
> > > > > drawbacks
> > > > > > that could have. So, could we add the drawbacks that Jun
> mentioned
> > > > about
> > > > > > the performance issue when consumer fetch from fo

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-22 Thread Abhijeet Kumar
Hi Jun,

Please find my comments inline.


On Thu, Apr 18, 2024 at 3:26 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the reply.
>
> 1. I am wondering if we could achieve the same result by just lowering
> local.retention.ms and local.retention.bytes. This also allows the newly
> started follower to build up the local data before serving the consumer
> traffic.
>

I am not sure I fully followed this. Do you mean we could lower the
local.retention (by size and time)
so that there is little data on the leader's local storage so that the
follower can quickly catch up with the leader?

In that case, we will need to set small local retention across brokers in
the cluster. It will have the undesired
effect where there will be increased remote log fetches for serving consume
requests, and this can cause
degradations. Also, this behaviour (of increased remote fetches) will
happen on all brokers at all times, whereas in
the KIP we are restricting the behavior only to the newly bootstrapped
brokers and only until the time it fully builds
the local logs as per retention defined at the cluster level.
(Deprioritization of the broker could help reduce the impact
 even further)


>
> 2. Have you updated the KIP?
>

The KIP has been updated now.


>
> Thanks,
>
> Jun
>
> On Tue, Apr 9, 2024 at 3:36 AM Satish Duggana 
> wrote:
>
> > +1 to Jun for adding the consumer fetching from a follower scenario
> > also to the existing section that talked about the drawback when a
> > node built with last-tiered-offset has become a leader. As Abhijeet
> > mentioned, we plan to have a follow-up KIP that will address these by
> > having a deprioritzation of these brokers. The deprioritization of
> > those brokers can be removed once they catchup until the local log
> > retention.
> >
> > Thanks,
> > Satish.
> >
> > On Tue, 9 Apr 2024 at 14:08, Luke Chen  wrote:
> > >
> > > Hi Abhijeet,
> > >
> > > Thanks for the KIP to improve the tiered storage feature!
> > >
> > > Questions:
> > > 1. We could also get the "pending-upload-offset" and epoch via remote
> log
> > > metadata, instead of adding a new API to fetch from the leader. Could
> you
> > > explain why you choose the later approach, instead of the former?
> > > 2.
> > > > We plan to have a follow-up KIP that will address both the
> > > deprioritization
> > > of these brokers from leadership, as well as
> > > for consumption (when fetching from followers is allowed).
> > >
> > > I agree with Jun that we might need to make it clear all possible
> > drawbacks
> > > that could have. So, could we add the drawbacks that Jun mentioned
> about
> > > the performance issue when consumer fetch from follower?
> > >
> > > 3. Could we add "Rejected Alternatives" section to the end of the KIP
> to
> > > add some of them?
> > > Like the "ListOffsetRequest" approach VS
> "Earliest-Pending-Upload-Offset"
> > > approach, or getting the "Earliest-Pending-Upload-Offset" from remote
> log
> > > metadata... etc.
> > >
> > > Thanks.
> > > Luke
> > >
> > >
> > > On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar <
> > abhijeet.cse@gmail.com>
> > > wrote:
> > >
> > > > Hi Christo,
> > > >
> > > > Please find my comments inline.
> > > >
> > > > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov <
> christolo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Abhijeet and Jun,
> > > > >
> > > > > I have been mulling this KIP over a bit more in recent days!
> > > > >
> > > > > re: Jun
> > > > >
> > > > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps -
> in
> > > > > retrospect it should have been fairly obvious. I would need to go
> an
> > > > update
> > > > > KIP-1005 myself then, thank you for giving the useful reference!
> > > > >
> > > > > 4. I think Abhijeet wants to rebuild state from
> latest-tiered-offset
> > and
> > > > > fetch from latest-tiered-offset + 1 only for new replicas (or
> > replicas
> > > > > which experienced a disk failure) to decrease the time a partition
> > spends
> > > > > in under-replicated state. In other words, a follower which has
> just
> > > > fallen
> > > > > out of ISR, but has loca

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-22 Thread Abhijeet Kumar
Hi Luke,

Thanks for your comments. Please find my responses inline.

On Tue, Apr 9, 2024 at 2:08 PM Luke Chen  wrote:

> Hi Abhijeet,
>
> Thanks for the KIP to improve the tiered storage feature!
>
> Questions:
> 1. We could also get the "pending-upload-offset" and epoch via remote log
> metadata, instead of adding a new API to fetch from the leader. Could you
> explain why you choose the later approach, instead of the former?
>

The remote log metadata could be tracking overlapping log segments. The
maximum offset
across the log segments it may be tracking, may not be the
last-tiered-offset because of truncations
during unclean leader election. Remote Log metadata alone is not sufficient
to identify last-tiered-offset or
pending-upload-offset.

Only the leader knows the correct lineage of offsets that is required to
identify the "pending-upload-offset".
That is the reason we chose to add a new API to fetch this information from
the leader.


2.
> > We plan to have a follow-up KIP that will address both the
> deprioritization
> of these brokers from leadership, as well as
> for consumption (when fetching from followers is allowed).
>
> I agree with Jun that we might need to make it clear all possible drawbacks
> that could have. So, could we add the drawbacks that Jun mentioned about
> the performance issue when consumer fetch from follower?
>
>
Updated the KIP to call this out.


> 3. Could we add "Rejected Alternatives" section to the end of the KIP to
> add some of them?
> Like the "ListOffsetRequest" approach VS "Earliest-Pending-Upload-Offset"
> approach, or getting the "Earliest-Pending-Upload-Offset" from remote log
> metadata... etc.
>
> Added the section on Rejected Alternatives


> Thanks.
> Luke
>
>
> On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar 
> wrote:
>
> > Hi Christo,
> >
> > Please find my comments inline.
> >
> > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov 
> > wrote:
> >
> > > Hello Abhijeet and Jun,
> > >
> > > I have been mulling this KIP over a bit more in recent days!
> > >
> > > re: Jun
> > >
> > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps - in
> > > retrospect it should have been fairly obvious. I would need to go an
> > update
> > > KIP-1005 myself then, thank you for giving the useful reference!
> > >
> > > 4. I think Abhijeet wants to rebuild state from latest-tiered-offset
> and
> > > fetch from latest-tiered-offset + 1 only for new replicas (or replicas
> > > which experienced a disk failure) to decrease the time a partition
> spends
> > > in under-replicated state. In other words, a follower which has just
> > fallen
> > > out of ISR, but has local data will continue using today's Tiered
> Storage
> > > replication protocol (i.e. fetching from earliest-local). I further
> > believe
> > > he has taken this approach so that local state of replicas which have
> > just
> > > fallen out of ISR isn't forcefully wiped thus leading to situation 1.
> > > Abhijeet, have I understood (and summarised) what you are proposing
> > > correctly?
> > >
> > > Yes, your understanding is correct. We want to limit the behavior
> changes
> > only to new replicas.
> >
> >
> > > 5. I think in today's Tiered Storage we know the leader's
> > log-start-offset
> > > from the FetchResponse and we can learn its local-log-start-offset from
> > the
> > > ListOffsets by asking for earliest-local timestamp (-4). But granted,
> > this
> > > ought to be added as an additional API call in the KIP.
> > >
> > >
> > Yes, I clarified this in my reply to Jun. I will add this missing detail
> in
> > the KIP.
> >
> >
> > > re: Abhijeet
> > >
> > > 101. I am still a bit confused as to why you want to include a new
> offset
> > > (i.e. pending-upload-offset) when you yourself mention that you could
> use
> > > an already existing offset (i.e. last-tiered-offset + 1). In essence,
> you
> > > end your Motivation with "In this KIP, we will focus only on the
> follower
> > > fetch protocol using the *last-tiered-offset*" and then in the
> following
> > > sections you talk about pending-upload-offset. I understand this might
> be
> > > classified as an implementation detail, but if you introduce a new
> offset
> > > (i.e. pending-upload-offset) you have to make a change to the
> ListOffsets
> > > API (i.e. introduc

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-09 Thread Abhijeet Kumar
a.version).
> >
> > 3. "Instead of fetching Earliest-Pending-Upload-Offset, it could fetch
> the
> > last-tiered-offset from the leader, and make a separate leader call to
> > fetch leader epoch for the following offset."
> > Why do we need to make a separate call for the leader epoch?
> > ListOffsetsResponse include both the offset and the corresponding epoch.
> >
> > 4. "Check if the follower replica is empty and if the feature to use
> > last-tiered-offset is enabled."
> > Why do we need to check if the follower replica is empty?
> >
> > 5. It can be confirmed by checking if the leader's Log-Start-Offset is
> the
> > same as the Leader's Local-Log-Start-Offset.
> > How does the follower know Local-Log-Start-Offset?
> >
> > Jun
> >
> > On Sat, Mar 30, 2024 at 5:51 AM Abhijeet Kumar <
> abhijeet.cse@gmail.com
> > >
> > wrote:
> >
> > > Hi Christo,
> > >
> > > Thanks for reviewing the KIP.
> > >
> > > The follower needs the earliest-pending-upload-offset (and the
> > > corresponding leader epoch) from the leader.
> > > This is the first offset the follower will have locally.
> > >
> > > Regards,
> > > Abhijeet.
> > >
> > >
> > >
> > > On Fri, Mar 29, 2024 at 1:14 PM Christo Lolov 
> > > wrote:
> > >
> > > > Heya!
> > > >
> > > > First of all, thank you very much for the proposal, you have
> explained
> > > the
> > > > problem you want solved very well - I think a faster bootstrap of an
> > > empty
> > > > replica is definitely an improvement!
> > > >
> > > > For my understanding, which concrete offset do you want the leader to
> > > give
> > > > back to a follower - earliest-pending-upload-offset or the
> > > > latest-tiered-offset? If it is the second, then I believe KIP-1005
> > ought
> > > to
> > > > already be exposing that offset as part of the ListOffsets API, no?
> > > >
> > > > Best,
> > > > Christo
> > > >
> > > > On Wed, 27 Mar 2024 at 18:23, Abhijeet Kumar <
> > abhijeet.cse@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I have created KIP-1023 to introduce follower fetch from tiered
> > offset.
> > > > > This feature will be helpful in significantly reducing Kafka
> > > > > rebalance/rebuild times when the cluster is enabled with tiered
> > > storage.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset
> > > > >
> > > > > Feedback and suggestions are welcome.
> > > > >
> > > > > Regards,
> > > > > Abhijeet.
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-09 Thread Abhijeet Kumar
Hi Jun,

Thank you for taking the time to review the KIP. Please find my comments
inline.

On Fri, Apr 5, 2024 at 12:09 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the KIP. Left a few comments.
>
> 1. "A drawback of using the last-tiered-offset is that this new follower
> would possess only a limited number of locally stored segments. Should it
> ascend to the role of leader, there is a risk of needing to fetch these
> segments from the remote storage, potentially impacting broker
> performance."
> Since we support consumers fetching from followers, this is a potential
> issue on the follower side too. In theory, it's possible for a segment to
> be tiered immediately after rolling. In that case, there could be very
> little data after last-tiered-offset. It would be useful to think through
> how to address this issue.
>

We plan to have a follow-up KIP that will address both the deprioritization
of these brokers from leadership, as well as
for consumption (when fetching from followers is allowed).


>
> 2. ListOffsetsRequest:
> 2.1 Typically, we need to bump up the version of the request if we add a
> new value for timestamp. See
>
> https://github.com/apache/kafka/pull/10760/files#diff-fac7080d67da905a80126d58fc1745c9a1409de7ef7d093c2ac66a888b134633
> .
>

Yes, let me update the KIP to include this change. We will need a new
timestamp corresponding to Earliest-Pending-Upload-Offset.


> 2.2 Since this changes the inter broker request protocol, it would be
> useful to have a section on upgrade (e.g. new IBP/metadata.version).
>
> Make sense. I will update the KIP to capture this.


> 3. "Instead of fetching Earliest-Pending-Upload-Offset, it could fetch the
> last-tiered-offset from the leader, and make a separate leader call to
> fetch leader epoch for the following offset."
> Why do we need to make a separate call for the leader epoch?
> ListOffsetsResponse include both the offset and the corresponding epoch.
>
> I understand there is some confusion here. Let me try to explain this.

The follower needs to build the local data starting from the offset
Earliest-Pending-Upload-Offset. Hence it needs the offset and the
corresponding leader-epoch.
There are two ways to do this:
   1. We add support in ListOffsetRequest to be able to fetch this offset
(and leader epoch) from the leader.
   2. Or, fetch the tiered-offset (which is already supported). From this
offset, we can get the Earliest-Pending-Upload-Offset. We can just add 1 to
the tiered-offset.
  However, we still need the leader epoch for offset, since there is no
guarantee that the leader epoch for Earliest-Pending-Upload-Offset will be
the same as the
  leader epoch for tiered-offset. We may need another API call to the
leader for this.

I prefer the first approach. The only problem with the first approach is
that it introduces one more offset. The second approach avoids this problem
but is a little complicated.


> 4. "Check if the follower replica is empty and if the feature to use
> last-tiered-offset is enabled."
> Why do we need to check if the follower replica is empty?
>
>
We want to limit this new behavior only to new replicas. Replicas that
become out of ISR are excluded from this behavior change. Those will
continue with the existing behavior.


> 5. It can be confirmed by checking if the leader's Log-Start-Offset is the
> same as the Leader's Local-Log-Start-Offset.
> How does the follower know Local-Log-Start-Offset?
>

Missed this detail. The follower will need to call the leader APi to fetch
the EarliestLocal offset for this.


> Jun
>
> On Sat, Mar 30, 2024 at 5:51 AM Abhijeet Kumar  >
> wrote:
>
> > Hi Christo,
> >
> > Thanks for reviewing the KIP.
> >
> > The follower needs the earliest-pending-upload-offset (and the
> > corresponding leader epoch) from the leader.
> > This is the first offset the follower will have locally.
> >
> > Regards,
> > Abhijeet.
> >
> >
> >
> > On Fri, Mar 29, 2024 at 1:14 PM Christo Lolov 
> > wrote:
> >
> > > Heya!
> > >
> > > First of all, thank you very much for the proposal, you have explained
> > the
> > > problem you want solved very well - I think a faster bootstrap of an
> > empty
> > > replica is definitely an improvement!
> > >
> > > For my understanding, which concrete offset do you want the leader to
> > give
> > > back to a follower - earliest-pending-upload-offset or the
> > > latest-tiered-offset? If it is the second, then I believe KIP-1005
> ought
> > to
> > > already be exposing that offset as part of the ListOffsets API, no?
> > >
> > > Best,
> > > Chris

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-03-30 Thread Abhijeet Kumar
Hi Christo,

Thanks for reviewing the KIP.

The follower needs the earliest-pending-upload-offset (and the
corresponding leader epoch) from the leader.
This is the first offset the follower will have locally.

Regards,
Abhijeet.



On Fri, Mar 29, 2024 at 1:14 PM Christo Lolov 
wrote:

> Heya!
>
> First of all, thank you very much for the proposal, you have explained the
> problem you want solved very well - I think a faster bootstrap of an empty
> replica is definitely an improvement!
>
> For my understanding, which concrete offset do you want the leader to give
> back to a follower - earliest-pending-upload-offset or the
> latest-tiered-offset? If it is the second, then I believe KIP-1005 ought to
> already be exposing that offset as part of the ListOffsets API, no?
>
> Best,
> Christo
>
> On Wed, 27 Mar 2024 at 18:23, Abhijeet Kumar 
> wrote:
>
> > Hi All,
> >
> > I have created KIP-1023 to introduce follower fetch from tiered offset.
> > This feature will be helpful in significantly reducing Kafka
> > rebalance/rebuild times when the cluster is enabled with tiered storage.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset
> >
> > Feedback and suggestions are welcome.
> >
> > Regards,
> > Abhijeet.
> >
>


[DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-03-27 Thread Abhijeet Kumar
Hi All,

I have created KIP-1023 to introduce follower fetch from tiered offset.
This feature will be helpful in significantly reducing Kafka
rebalance/rebuild times when the cluster is enabled with tiered storage.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset

Feedback and suggestions are welcome.

Regards,
Abhijeet.


Re: [VOTE] KIP-956: Tiered Storage Quotas

2024-03-19 Thread Abhijeet Kumar
Hi All,

This KIP is accepted with 3 +1 binding votes(Jun, Satish, Luke) and 2 +1
non-binding votes(Kamal, Jorge).

Thank you all for voting.

Regards.
Abhijeet.



On Tue, Mar 19, 2024 at 3:35 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks Abhjeet! Looking forward for this one.
> +1 (non-binding).
>
> On Thu, 14 Mar 2024 at 06:08, Luke Chen  wrote:
>
> > Thanks for the KIP!
> > +1 from me.
> >
> > Luke
> >
> > On Sun, Mar 10, 2024 at 8:44 AM Satish Duggana  >
> > wrote:
> >
> > > Thanks Abhijeet for the KIP, +1 from me.
> > >
> > >
> > > On Sat, 9 Mar 2024 at 1:51 AM, Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > +1 (non-binding), Thanks for the KIP, Abhijeet!
> > > >
> > > > --
> > > > Kamal
> > > >
> > > > On Fri, Mar 8, 2024 at 11:02 PM Jun Rao 
> > > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the KIP. +1
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Mar 8, 2024 at 3:44 AM Abhijeet Kumar <
> > > > abhijeet.cse@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I would like to start the vote for KIP-956 - Tiered Storage
> Quotas
> > > > > >
> > > > > > The KIP is here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > >
> > > > > > Regards.
> > > > > > Abhijeet.
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-03-16 Thread Abhijeet Kumar
Hi Jorge,

The configs name was chosen to keep it consistent with the other existing
quota configs, such as
*replica.alter.log.dirs.io.max.bytes.per.second* as pointed out by Jun in
the thread.

Also, we can revisit the names of the components during implementation,
since those are not exposed to the user.

Please let me know if you have any further concerns.

Regards,
Abhijeet.



On Mon, Mar 11, 2024 at 6:11 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Abhijeet,
>
> Thanks for the KIP! Looks good to me. I just have a minor comments on
> naming:
>
> Would it be work to align the config names to existing quota names?
> e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of
> `remote.log.manager.copy.max.bytes.per.second`?
>
> Same for new components, could we use the same verbs as in the configs:
> - RLMCopyQuotaManager
> - RLMFetchQuotaManager
>
>
> On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar 
> wrote:
>
> > Thank you all for your comments. As all the comments in the thread are
> > addressed, I am starting a Vote thread for the KIP. Please have a look.
> >
> > Regards.
> >
> > On Thu, Mar 7, 2024 at 12:34 PM Luke Chen  wrote:
> >
> > > Hi Abhijeet,
> > >
> > > Thanks for the update and the explanation.
> > > I had another look, and it LGTM now!
> > >
> > > Thanks.
> > > Luke
> > >
> > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao 
> wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the reply. Sounds good to me.
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> > > abhijeet.cse@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks for pointing it out. It makes sense to me. We can have the
> > > > following
> > > > > metrics instead. What do you think?
> > > > >
> > > > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms
> > > remote
> > > > >fetches/copies was throttled by a broker)
> > > > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> > > > remote
> > > > >fetches/copies was throttled by a broker)
> > > > >
> > > > > These are similar to fetch-throttle-time-avg and
> > > fetch-throttle-time-max
> > > > > metrics we have for Kafak Consumers?
> > > > > The Avg and Max are computed over the (sliding) window as defined
> by
> > > the
> > > > > configuration metrics.sample.window.ms on the server.
> > > > >
> > > > > (Also, I will update the config and metric names to be consistent)
> > > > >
> > > > > Regards.
> > > > >
> > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao 
> > > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > The issue with recording the throttle time as a gauge is that
> it's
> > > > > > transient. If the metric is not read immediately, the recorded
> > value
> > > > > could
> > > > > > be reset to 0. The admin won't realize that throttling has
> > happened.
> > > > > >
> > > > > > For client quotas, the throttle time is tracked as the average
> > > > > > throttle-time per user/client-id. This makes the metric less
> > > transient.
> > > > > >
> > > > > > Also, the configs use read/write whereas the metrics use
> > fetch/copy.
> > > > > Could
> > > > > > we make them consistent?
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > > > abhijeet.cse@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > > > > >
> > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > name=RemoteFetchThrottleTime
> > > >

[VOTE] KIP-956: Tiered Storage Quotas

2024-03-08 Thread Abhijeet Kumar
Hi All,

I would like to start the vote for KIP-956 - Tiered Storage Quotas

The KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

Regards.
Abhijeet.


Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-03-08 Thread Abhijeet Kumar
Thank you all for your comments. As all the comments in the thread are
addressed, I am starting a Vote thread for the KIP. Please have a look.

Regards.

On Thu, Mar 7, 2024 at 12:34 PM Luke Chen  wrote:

> Hi Abhijeet,
>
> Thanks for the update and the explanation.
> I had another look, and it LGTM now!
>
> Thanks.
> Luke
>
> On Tue, Mar 5, 2024 at 2:50 AM Jun Rao  wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the reply. Sounds good to me.
> >
> > Jun
> >
> >
> > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> abhijeet.cse@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for pointing it out. It makes sense to me. We can have the
> > following
> > > metrics instead. What do you think?
> > >
> > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms
> remote
> > >fetches/copies was throttled by a broker)
> > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> > remote
> > >fetches/copies was throttled by a broker)
> > >
> > > These are similar to fetch-throttle-time-avg and
> fetch-throttle-time-max
> > > metrics we have for Kafak Consumers?
> > > The Avg and Max are computed over the (sliding) window as defined by
> the
> > > configuration metrics.sample.window.ms on the server.
> > >
> > > (Also, I will update the config and metric names to be consistent)
> > >
> > > Regards.
> > >
> > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao 
> > wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > The issue with recording the throttle time as a gauge is that it's
> > > > transient. If the metric is not read immediately, the recorded value
> > > could
> > > > be reset to 0. The admin won't realize that throttling has happened.
> > > >
> > > > For client quotas, the throttle time is tracked as the average
> > > > throttle-time per user/client-id. This makes the metric less
> transient.
> > > >
> > > > Also, the configs use read/write whereas the metrics use fetch/copy.
> > > Could
> > > > we make them consistent?
> > > >
> > > > Jun
> > > >
> > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > abhijeet.cse@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > > >
> > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteFetchThrottleTime
> > ->
> > > > The
> > > > > duration of time required at a given moment to bring the observed
> > fetch
> > > > > rate within the allowed limit, by preventing further reads.
> > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > ->
> > > > The
> > > > > duration of time required at a given moment to bring the observed
> > > remote
> > > > > copy rate within the allowed limit, by preventing further copies.
> > > > >
> > > > > Regards.
> > > > >
> > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao  >
> > > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Thanks for the explanation. Makes sense to me now.
> > > > > >
> > > > > > Just a minor comment. Could you document the exact meaning of the
> > > > > following
> > > > > > two metrics? For example, is the time accumulated? If so, is it
> > from
> > > > the
> > > > > > start of the broker or over some window?
> > > > > >
> > > > > > kafka.log.remote:type=RemoteLogManager,
> > name=RemoteFetchThrottleTime
> > > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteCopyThrottleTime
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > > abhijeet.cse@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> >

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-03-02 Thread Abhijeet Kumar
Hi Jun,

Thanks for pointing it out. It makes sense to me. We can have the following
metrics instead. What do you think?

   - remote-(fetch|copy)-throttle-time-avg (The average time in ms remote
   fetches/copies was throttled by a broker)
   - remote-(fetch|copy)-throttle-time--max (The maximum time in ms remote
   fetches/copies was throttled by a broker)

These are similar to fetch-throttle-time-avg and fetch-throttle-time-max
metrics we have for Kafak Consumers?
The Avg and Max are computed over the (sliding) window as defined by the
configuration metrics.sample.window.ms on the server.

(Also, I will update the config and metric names to be consistent)

Regards.

On Thu, Feb 29, 2024 at 2:51 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the reply.
>
> The issue with recording the throttle time as a gauge is that it's
> transient. If the metric is not read immediately, the recorded value could
> be reset to 0. The admin won't realize that throttling has happened.
>
> For client quotas, the throttle time is tracked as the average
> throttle-time per user/client-id. This makes the metric less transient.
>
> Also, the configs use read/write whereas the metrics use fetch/copy. Could
> we make them consistent?
>
> Jun
>
> On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar  >
> wrote:
>
> > Hi Jun,
> >
> > Clarified the meaning of the two metrics. Also updated the KIP.
> >
> > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime ->
> The
> > duration of time required at a given moment to bring the observed fetch
> > rate within the allowed limit, by preventing further reads.
> > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime ->
> The
> > duration of time required at a given moment to bring the observed remote
> > copy rate within the allowed limit, by preventing further copies.
> >
> > Regards.
> >
> > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao 
> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the explanation. Makes sense to me now.
> > >
> > > Just a minor comment. Could you document the exact meaning of the
> > following
> > > two metrics? For example, is the time accumulated? If so, is it from
> the
> > > start of the broker or over some window?
> > >
> > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > >
> > > Jun
> > >
> > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > abhijeet.cse@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > The existing quota system for consumers is designed to throttle the
> > > > consumer if it exceeds the allowed fetch rate.
> > > > The additional quota we want to add works on the broker level. If the
> > > > broker-level remote read quota is being
> > > > exceeded, we prevent additional reads from the remote storage but do
> > not
> > > > prevent local reads for the consumer.
> > > > If the consumer has specified other partitions to read, which can be
> > > served
> > > > from local, it can continue to read those
> > > > partitions. To elaborate more, we make a check for quota exceeded if
> we
> > > > know a segment needs to be read from
> > > > remote. If the quota is exceeded, we simply skip the partition and
> move
> > > to
> > > > other segments in the fetch request.
> > > > This way consumers can continue to read the local data as long as
> they
> > > have
> > > > not exceeded the client-level quota.
> > > >
> > > > Also, when we choose the appropriate consumer-level quota, we would
> > > > typically look at what kind of local fetch
> > > > throughput is supported. If we were to reuse the same consumer quota,
> > we
> > > > should also consider the throughput
> > > > the remote storage supports. The throughput supported by remote may
> be
> > > > less/more than the throughput supported
> > > > by local (when using a cloud provider, it depends on the plan opted
> by
> > > the
> > > > user). The consumer quota has to be carefully
> > > > set considering both local and remote throughput. Instead, if we
> have a
> > > > separate quota, it makes things much simpler
> > > > for the user, since they already know what throughput their remote
> > > storage
> > >

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-02-28 Thread Abhijeet Kumar
Hi Jun,

Clarified the meaning of the two metrics. Also updated the KIP.

kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> The
duration of time required at a given moment to bring the observed fetch
rate within the allowed limit, by preventing further reads.
kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> The
duration of time required at a given moment to bring the observed remote
copy rate within the allowed limit, by preventing further copies.

Regards.

On Wed, Feb 28, 2024 at 12:28 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the explanation. Makes sense to me now.
>
> Just a minor comment. Could you document the exact meaning of the following
> two metrics? For example, is the time accumulated? If so, is it from the
> start of the broker or over some window?
>
> kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
>
> Jun
>
> On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar  >
> wrote:
>
> > Hi Jun,
> >
> > The existing quota system for consumers is designed to throttle the
> > consumer if it exceeds the allowed fetch rate.
> > The additional quota we want to add works on the broker level. If the
> > broker-level remote read quota is being
> > exceeded, we prevent additional reads from the remote storage but do not
> > prevent local reads for the consumer.
> > If the consumer has specified other partitions to read, which can be
> served
> > from local, it can continue to read those
> > partitions. To elaborate more, we make a check for quota exceeded if we
> > know a segment needs to be read from
> > remote. If the quota is exceeded, we simply skip the partition and move
> to
> > other segments in the fetch request.
> > This way consumers can continue to read the local data as long as they
> have
> > not exceeded the client-level quota.
> >
> > Also, when we choose the appropriate consumer-level quota, we would
> > typically look at what kind of local fetch
> > throughput is supported. If we were to reuse the same consumer quota, we
> > should also consider the throughput
> > the remote storage supports. The throughput supported by remote may be
> > less/more than the throughput supported
> > by local (when using a cloud provider, it depends on the plan opted by
> the
> > user). The consumer quota has to be carefully
> > set considering both local and remote throughput. Instead, if we have a
> > separate quota, it makes things much simpler
> > for the user, since they already know what throughput their remote
> storage
> > supports.
> >
> > (Also, thanks for pointing out. I will update the KIP based on the
> > discussion)
> >
> > Regards,
> > Abhijeet.
> >
> > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao 
> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Sorry for the late reply. It seems that you haven't updated the KIP
> based
> > > on the discussion? One more comment.
> > >
> > > 11. Currently, we already have a quota system for both the producers
> and
> > > consumers. I can understand why we need an additional
> > > remote.log.manager.write.quota.default quota. For example, when tier
> > > storage is enabled for the first time, there could be a lot of segments
> > > that need to be written to the remote storage, even though there is no
> > > increase in the produced data. However, I am not sure about an
> > > additional remote.log.manager.read.quota.default. The KIP says that the
> > > reason is "This may happen when the majority of the consumers start
> > reading
> > > from the earliest offset of their respective Kafka topics.". However,
> > this
> > > can happen with or without tier storage and the current quota system
> for
> > > consumers is designed for solving this exact problem. Could you explain
> > the
> > > usage of this additional quota?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > abhijeet.cse@gmail.com>
> > > wrote:
> > >
> > > > Comments inline
> > > >
> > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao 
> > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the KIP. A few comments.
> > > > >
> > > > > 10. remote.log.manager.write.quota.default:
> > > > &

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-02-27 Thread Abhijeet Kumar
Hi Jun,

The existing quota system for consumers is designed to throttle the
consumer if it exceeds the allowed fetch rate.
The additional quota we want to add works on the broker level. If the
broker-level remote read quota is being
exceeded, we prevent additional reads from the remote storage but do not
prevent local reads for the consumer.
If the consumer has specified other partitions to read, which can be served
from local, it can continue to read those
partitions. To elaborate more, we make a check for quota exceeded if we
know a segment needs to be read from
remote. If the quota is exceeded, we simply skip the partition and move to
other segments in the fetch request.
This way consumers can continue to read the local data as long as they have
not exceeded the client-level quota.

Also, when we choose the appropriate consumer-level quota, we would
typically look at what kind of local fetch
throughput is supported. If we were to reuse the same consumer quota, we
should also consider the throughput
the remote storage supports. The throughput supported by remote may be
less/more than the throughput supported
by local (when using a cloud provider, it depends on the plan opted by the
user). The consumer quota has to be carefully
set considering both local and remote throughput. Instead, if we have a
separate quota, it makes things much simpler
for the user, since they already know what throughput their remote storage
supports.

(Also, thanks for pointing out. I will update the KIP based on the
discussion)

Regards,
Abhijeet.

On Tue, Feb 27, 2024 at 2:49 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Sorry for the late reply. It seems that you haven't updated the KIP based
> on the discussion? One more comment.
>
> 11. Currently, we already have a quota system for both the producers and
> consumers. I can understand why we need an additional
> remote.log.manager.write.quota.default quota. For example, when tier
> storage is enabled for the first time, there could be a lot of segments
> that need to be written to the remote storage, even though there is no
> increase in the produced data. However, I am not sure about an
> additional remote.log.manager.read.quota.default. The KIP says that the
> reason is "This may happen when the majority of the consumers start reading
> from the earliest offset of their respective Kafka topics.". However, this
> can happen with or without tier storage and the current quota system for
> consumers is designed for solving this exact problem. Could you explain the
> usage of this additional quota?
>
> Thanks,
>
> Jun
>
> On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> abhijeet.cse@gmail.com>
> wrote:
>
> > Comments inline
> >
> > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao  wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the KIP. A few comments.
> > >
> > > 10. remote.log.manager.write.quota.default:
> > > 10.1 For other configs, we
> > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> > > perhaps this can be sth like
> > remote.log.manager.write.max.bytes.per.second.
> > >
> >
> > This makes sense, we can rename the following configs to be consistent.
> >
> > Remote.log.manager.write.quota.default ->
> > remote.log.manager.write.max.bytes.per.second
> >
> > Remote.log.manager.read.quota.default ->
> > remote.log.manager.read.max.bytes.per.second.
> >
> >
> >
> > > 10.2 Could we list the new metrics associated with the new quota.
> > >
> >
> > We will add the following metrics as mentioned in the other response.
> > *RemoteFetchThrottleTime* - The amount of time needed to bring the
> observed
> > remote fetch rate within the read quota
> > *RemoteCopyThrottleTime *- The amount of time needed to bring the
> observed
> > remote copy rate with the copy quota.
> >
> > 10.3 Is this dynamically configurable? If so, could we document the
> impact
> > > to tools like kafka-configs.sh and AdminClient?
> > >
> >
> > Yes, the quotas are dynamically configurable. We will add them as Dynamic
> > Broker Configs. Users will be able to change
> > the following configs using either kafka-configs.sh or AdminClient by
> > specifying the config name and new value. For eg.
> >
> > Using kafka-configs.sh
> >
> > bin/kafka-configs.sh --bootstrap-server  --entity-type
> > brokers --entity-default --alter --add-config
> > remote.log.manager.write.max.bytes.per.second=52428800
> >
> > Using AdminClient
> >
> > ConfigEntry configEntry = new
> > ConfigEntry("remote.log.manager.write.max.bytes.per.second", &quo

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-02-12 Thread Abhijeet Kumar
Comments inline

On Wed, Dec 6, 2023 at 1:12 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the KIP. A few comments.
>
> 10. remote.log.manager.write.quota.default:
> 10.1 For other configs, we
> use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> perhaps this can be sth like remote.log.manager.write.max.bytes.per.second.
>

This makes sense, we can rename the following configs to be consistent.

Remote.log.manager.write.quota.default ->
remote.log.manager.write.max.bytes.per.second

Remote.log.manager.read.quota.default ->
remote.log.manager.read.max.bytes.per.second.



> 10.2 Could we list the new metrics associated with the new quota.
>

We will add the following metrics as mentioned in the other response.
*RemoteFetchThrottleTime* - The amount of time needed to bring the observed
remote fetch rate within the read quota
*RemoteCopyThrottleTime *- The amount of time needed to bring the observed
remote copy rate with the copy quota.

10.3 Is this dynamically configurable? If so, could we document the impact
> to tools like kafka-configs.sh and AdminClient?
>

Yes, the quotas are dynamically configurable. We will add them as Dynamic
Broker Configs. Users will be able to change
the following configs using either kafka-configs.sh or AdminClient by
specifying the config name and new value. For eg.

Using kafka-configs.sh

bin/kafka-configs.sh --bootstrap-server  --entity-type
brokers --entity-default --alter --add-config
remote.log.manager.write.max.bytes.per.second=52428800

Using AdminClient

ConfigEntry configEntry = new
ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800");
AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
AlterConfigOp.OpType.SET);
List alterConfigOps =
Collections.singletonList(alterConfigOp);

ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER,
"");
Map> updateConfig =
ImmutableMap.of(resource, alterConfigOps);
adminClient.incrementalAlterConfigs(updateConfig);


>
> Jun
>
> On Tue, Nov 28, 2023 at 2:19 AM Luke Chen  wrote:
>
> > Hi Abhijeet,
> >
> > Thanks for the KIP!
> > This is an important feature for tiered storage.
> >
> > Some comments:
> > 1. Will we introduce new metrics for this tiered storage quotas?
> > This is important because the admin can know the throttling status by
> > checking the metrics while the remote write/read are slow, like the rate
> of
> > uploading/reading byte rate, the throttled time for upload/read... etc.
> >
> > 2. Could you give some examples for the throttling algorithm in the KIP
> to
> > explain it? That will make it much clearer.
> >
> > 3. To solve this problem, we can break down the RLMTask into two smaller
> > tasks - one for segment upload and the other for handling expired
> segments.
> > How do we handle the situation when a segment is still waiting for
> > offloading while this segment is expired and eligible to be deleted?
> > Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> > just check it each time the RLMTask runs?
> >
> > Thank you.
> > Luke
> >
> > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> abhijeet.cse@gmail.com
> > >
> > wrote:
> >
> > > Hi All,
> > >
> > > I have created KIP-956 for defining read and write quota for tiered
> > > storage.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > >
> > > Feedback and suggestions are welcome.
> > >
> > > Regards,
> > > Abhijeet.
> > >
> >
>


-- 
Abhijeet.


Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-02-12 Thread Abhijeet Kumar
when a segment is still waiting for
> offloading while this segment is expired and eligible to be deleted?
> Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> just check it each time the RLMTask runs?
>

The concern here is that this may cause starvation from some topic
partitions. RLMTasks are added to the thread pool executor
to run with a specific schedule (every 30 secs). If we do not block the
RLMTask when the quota is exceeded and instead skip
the run, RLMTask for certain topic partitions may never run, because every
time it is scheduled to run, the broker-level upload
quota at that instant may have already been exhausted

Breaking down the RLMTask into smaller tasks is already being proposed in
KIP-950. The two tasks will need some coordination

at a topic-partition level to handle such cases as pointed out. For this
particular case, the upload task could skip uploading the

segment if it is already eligible for deletion.


> Thank you.
> Luke
>
> On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar 
> wrote:
>
> > Hi All,
> >
> > I have created KIP-956 for defining read and write quota for tiered
> > storage.
> >
> >
> >
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> >
> > Feedback and suggestions are welcome.
> >
> > Regards,
> > Abhijeet.
> >



-- 
Abhijeet.


Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-02-02 Thread Abhijeet Kumar
Yes, it is part of V1. I have added it now.

I will follow up on the KIP comments in the next couple of days and
try to close the review in the next few weeks.

Regards.

On Thu, Feb 1, 2024 at 7:40 PM Francois Visconte
 wrote:
>
> Hi,
>
> I see that the ticket has been left untouched since a while now.
> Should it be included in the tiered storage v1?
> We've observed that lacking a way to throttle uploads to tiered storage has
> a major impact on
> producers and consumers when tiered storage access recovers (starving disk
> IOps/throughput or CPU).
> For this reason, I think this is an important feature and possibly worth
> including in v1?
>
> Regards,
>
>
> On Tue, Dec 5, 2023 at 8:43 PM Jun Rao  wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the KIP. A few comments.
> >
> > 10. remote.log.manager.write.quota.default:
> > 10.1 For other configs, we
> > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> > perhaps this can be sth like remote.log.manager.write.max.bytes.per.second.
> > 10.2 Could we list the new metrics associated with the new quota.
> > 10.3 Is this dynamically configurable? If so, could we document the impact
> > to tools like kafka-configs.sh and AdminClient?
> >
> > Jun
> >
> > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen  wrote:
> >
> > > Hi Abhijeet,
> > >
> > > Thanks for the KIP!
> > > This is an important feature for tiered storage.
> > >
> > > Some comments:
> > > 1. Will we introduce new metrics for this tiered storage quotas?
> > > This is important because the admin can know the throttling status by
> > > checking the metrics while the remote write/read are slow, like the rate
> > of
> > > uploading/reading byte rate, the throttled time for upload/read... etc.
> > >
> > > 2. Could you give some examples for the throttling algorithm in the KIP
> > to
> > > explain it? That will make it much clearer.
> > >
> > > 3. To solve this problem, we can break down the RLMTask into two smaller
> > > tasks - one for segment upload and the other for handling expired
> > segments.
> > > How do we handle the situation when a segment is still waiting for
> > > offloading while this segment is expired and eligible to be deleted?
> > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> > > just check it each time the RLMTask runs?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > abhijeet.cse@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have created KIP-956 for defining read and write quota for tiered
> > > > storage.
> > > >
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > >
> > > > Feedback and suggestions are welcome.
> > > >
> > > > Regards,
> > > > Abhijeet.
> > > >
> > >
> >



-- 
Abhijeet.


[DISCUSS] KIP-956: Tiered Storage Quotas

2023-11-22 Thread Abhijeet Kumar
Hi All,

I have created KIP-956 for defining read and write quota for tiered storage.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

Feedback and suggestions are welcome.

Regards,
Abhijeet.


[jira] [Created] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-10-24 Thread Pritam Kumar (Jira)
Pritam Kumar created KAFKA-15680:


 Summary: Partition-Count is not getting updated Correctly in the 
Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing
 Key: KAFKA-15680
 URL: https://issues.apache.org/jira/browse/KAFKA-15680
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.0.1
Reporter: Pritam Kumar
Assignee: Pritam Kumar


* In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, say 
Worker 3 joins, a new global assignment is computed by the leader, say Worker1, 
that results in the revocation of some tasks from each existing worker i.e 
Worker1 and Worker2.
 * Once the new member join is completed, 
*ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
computes all the new partitions assigned and the partitions which are revoked 
and updates the subscription Object.
 * If it was the case of revocation which we check by checking the 
“partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
which internally calls “updatePartitionCount()” which fetches partition from 
the *assignment* object which is yet not updated by the new assignment.
 * It is only just before calling the “{*}invokePartitionsAssigned{*}()” method 
that we update the *assignment* by invoking the following → 
*subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15181) Race condition on partition assigned to TopicBasedRemoteLogMetadataManager

2023-09-07 Thread Abhijeet Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijeet Kumar resolved KAFKA-15181.

Resolution: Fixed

> Race condition on partition assigned to TopicBasedRemoteLogMetadataManager 
> ---
>
> Key: KAFKA-15181
> URL: https://issues.apache.org/jira/browse/KAFKA-15181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>    Assignee: Abhijeet Kumar
>Priority: Major
>  Labels: tiered-storage
>
> TopicBasedRemoteLogMetadataManager (TBRLMM) uses a cache to be prepared 
> whever partitions are assigned.
> When partitions are assigned to the TBRLMM instance, a consumer is started to 
> keep the cache up to date.
> If the cache hasn't finalized to build, TBRLMM fails to return remote 
> metadata about partitions that are store on the backing topic. TBRLMM may not 
> recover from this failing state.
> A proposal to fix this issue would be wait after a partition is assigned for 
> the consumer to catch up. A similar logic is used at the moment when TBRLMM 
> writes to the topic, and uses send callback to wait for consumer to catch up. 
> This logic can be reused whever a partition is assigned, so when TBRLMM is 
> marked as initialized, cache is ready to serve requests.
> Reference: https://github.com/aiven/kafka/issues/33



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15261) ReplicaFetcher thread should not block if RLMM is not initialized

2023-09-05 Thread Abhijeet Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijeet Kumar resolved KAFKA-15261.

Resolution: Fixed

> ReplicaFetcher thread should not block if RLMM is not initialized
> -
>
> Key: KAFKA-15261
> URL: https://issues.apache.org/jira/browse/KAFKA-15261
> Project: Kafka
>  Issue Type: Sub-task
>    Reporter: Abhijeet Kumar
>    Assignee: Abhijeet Kumar
>Priority: Blocker
> Fix For: 3.6.0
>
>
> While building remote log aux state, the replica fetcher fetches the remote 
> log segment metadata. If the TBRLMM is not initialized yet, the call blocks. 
> Since replica fetchers share a common lock, it prevents other replica 
> fetchers from running as well. Also the same lock is shared in the handle 
> LeaderAndISR request path, hence those calls get blocked as well.
> Instead, replica fetcher should check if RLMM is initialized before 
> attempting to fetch the remote log segment metadata. If RLMM is not 
> initialized, it should throw a retryable error so that it can be retried 
> later, and also does not block other operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15405) Create a new error code to indicate a resource is not ready yet

2023-08-25 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-15405:
--

 Summary: Create a new error code to indicate a resource is not 
ready yet
 Key: KAFKA-15405
 URL: https://issues.apache.org/jira/browse/KAFKA-15405
 Project: Kafka
  Issue Type: Task
Reporter: Abhijeet Kumar


We need a new error code to indicate to the client that the resource is not 
ready on the server yet and is initializing. When the client receives this 
error it should retry again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Requesting permissions to contribute to Apache Kafka

2023-08-22 Thread Animesh Kumar
Hi Team,
Please provide access to contribute to Apache Kafka
JIRA id -- akanimesh7
Wiki Id -- akanimesh7
-- 
Animesh Kumar
8120004556


[jira] [Created] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.

2023-08-14 Thread Prasanth Kumar (Jira)
Prasanth Kumar created KAFKA-15343:
--

 Summary: Fix MirrorConnectIntegrationTests causing ci build 
failures.
 Key: KAFKA-15343
 URL: https://issues.apache.org/jira/browse/KAFKA-15343
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 3.6.0
Reporter: Prasanth Kumar


There are several instances of tests interacting badly with gradle daemon(s) 
running on ports that the kafka broker previously used. After going through the 
debug logs we observed a few retrying kafka clients trying to connect to broker 
which got shutdown and the gradle worker chose the same port on which broker 
was running. Later in the build, the gradle daemon attempted to connect to the 
worker and could not, triggering a failure. Ideally gradle would not exit when 
connected to from an invalid client - in testing with netcat, it would often 
handle these without dying. However there appear to be some cases where the 
daemon dies completely. Both the broker code and the gradle workers bind to 
port 0, resulting in the OS assigning it an unused port. This does avoid 
conflicts, but does not ensure that long lived clients do not attempt to 
connect to these ports afterwards. It's possible that closing the client in 
between may be enough to work around this issue. Till then we will disable the 
test to avoid the ci blocker from testing the code changes.



*MirrorConnectorsIntegrationBaseTest and extending Tests*


{code:java}
[2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
[TestEventLogger] 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 
testReplicateSourceDefault() STANDARD_OUT
[2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
[TestEventLogger] [2023-07-04 11:47:46,799]
 INFO primary REST service: http://localhost:43809/connectors 
(org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224)
[2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
[TestEventLogger] [2023-07-04 11:47:46,799] 
INFO backup REST service: http://localhost:43323/connectors 
(org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225)
[2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
[TestEventLogger] [2023-07-04 11:47:46,799] 
INFO primary brokers: localhost:37557 
(org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226)
[2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] 
[org.gradle.internal.remote.internal.inet.TcpIncomingConnector] 
Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557.
[2023-07-04T11:59:13.233Z] 
org.gradle.internal.remote.internal.MessageIOException: Could not read message 
from '/127.0.0.1:47660'.
[2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] 
[org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on 
[d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
addresses:[localhost/127.0.0.1]].
[2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] 
[system.err] org.gradle.internal.remote.internal.ConnectException: Could not 
connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15293) Update metrics doc to add tiered storage metrics

2023-08-02 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-15293:
--

 Summary: Update metrics doc to add tiered storage metrics
 Key: KAFKA-15293
 URL: https://issues.apache.org/jira/browse/KAFKA-15293
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhijeet Kumar






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-930: Tiered Storage Metrics

2023-07-31 Thread Abhijeet Kumar
Hi All,

This KIP is accepted with 3 +1 binding votes(Luke, Satish, Divij) and
2 +1 non-binding votes(Jorge, Kamal) .

Thank you all for voting.


On Mon, Jul 31, 2023 at 10:53 AM Satish Duggana 
wrote:

> Thanks for voting on the KIP.
>
> I agree we should raise the KIPs early so that we can have a longer
> duration for other people to take a look.
>
> It seems whoever was involved in the earlier discussions for this
> change commented and voted. It is a minor KIP to rename existing
> metrics, and it has been there for  ~ a week. We can close this with
> vote results to be included in 3.6.0.
>
> ~Satish.
>
> On Tue, 25 Jul 2023 at 23:17, Divij Vaidya 
> wrote:
> >
> > Thank you for the KIP Abhinav. Although we should avoid changing
> > customer-facing interfaces (such as metrics) after a KIP is accepted,
> > in this case, I think that the divergence is minimal and the right
> > thing to do in the longer run. Hence, I would consider this change as
> > a one-off exception and not a precedent for the future changes.
> >
> > +1 (binding) from me.
> >
> > Also, I think we should leave the vote open longer for some duration
> > (at least 2 weeks) to give an opportunity for folks in the community
> > to add any thoughts that they might have. The KIP has been published
> > for only 1 day so far and interested folks may not have had a chance
> > to look into it yet.
> >
> > --
> > Divij Vaidya
> >
> >
> >
> > On Tue, Jul 25, 2023 at 6:45 PM Satish Duggana 
> wrote:
> > >
> > > +1 for the KIP.
> > >
> > > Thanks,
> > > Satish.
> > >
> > > On Tue, 25 Jul 2023 at 18:31, Kamal Chandraprakash
> > >  wrote:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > --
> > > > Kamal
> > > >
> > > > On Tue, Jul 25, 2023 at 11:30 AM Abhijeet Kumar <
> abhijeet.cse@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I would like to start the vote for KIP-930 Tiered Storage Metrics.
> > > > >
> > > > > The KIP is here:
> > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics
> > > > >
> > > > > Regards
> > > > > Abhijeet.
> > > > >
>


-- 
Abhijeet.


[jira] [Created] (KAFKA-15261) ReplicaFetcher thread should not block if RLMM is not initialized

2023-07-27 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-15261:
--

 Summary: ReplicaFetcher thread should not block if RLMM is not 
initialized
 Key: KAFKA-15261
 URL: https://issues.apache.org/jira/browse/KAFKA-15261
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhijeet Kumar
Assignee: Abhijeet Kumar


While building remote log aux state, the replica fetcher fetches the remote log 
segment metadata. If the TBRLMM is not initialized yet, the call blocks. Since 
replica fetchers share a common lock, it prevents other replica fetchers from 
running as well. Also the same lock is shared in the handle LeaderAndISR 
request path, hence those calls get blocked as well.

Instead, replica fetcher should check if RLMM is initialized before attempting 
to fetch the remote log segment metadata. If RLMM is not initialized, it should 
throw a retryable error so that it can be retried later, and also does not 
block other operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15260) RLM Task should wait until RLMM is initialized before copying segments to remote

2023-07-27 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-15260:
--

 Summary: RLM Task should wait until RLMM is initialized before 
copying segments to remote
 Key: KAFKA-15260
 URL: https://issues.apache.org/jira/browse/KAFKA-15260
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhijeet Kumar


The RLM Task uploads segment to the remote storage for its leader partitions 
and after each upload it sends a message 'COPY_SEGMENT_STARTED' to the Topic 
based RLMM topic and then waits for the TBRLMM to consume the message before 
continuing.

If the RLMM is not initialized, TBRLMM may not be able to consume the message 
within the stipulated time and timeout and RLMM will repeat later. It make take 
a few mins for the TBRLMM to initialize during which RLM Task will keep timing 
out.

Instead the RLM task should wait until RLMM is initialized before attempting to 
copy segments to remote storage.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-930: Tiered Storage Metrics

2023-07-25 Thread Abhijeet Kumar
Please find the updated link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Rename+ambiguous+Tiered+Storage+Metrics

Updated the KIP as per our conversation on the discussion thread.

On Tue, Jul 25, 2023 at 11:29 AM Abhijeet Kumar 
wrote:

> Hi All,
>
> I would like to start the vote for KIP-930 Tiered Storage Metrics.
>
> The KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics
>
> Regards
> Abhijeet.
>
>

-- 
Abhijeet.


Re: [DISCUSS] KIP-930: Tiered Storage Metrics

2023-07-25 Thread Abhijeet Kumar
Hi Kamal,

As we discussed offline, I will rename this KIP so that it only captures
the aspect of renaming the previously added metrics to remove ambiguity.
I will create another KIP for RemoteIndexCache metrics and other relevant
tiered storage metrics.

On Tue, Jul 25, 2023 at 12:03 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi Abhijeet,
>
> Thanks for the KIP!
>
> We are changing the metric names from what was proposed in the KIP-405 and
> adding new metrics for RemoteIndexCache.
> In the KIP, it's not clear whether we are renaming the aggregate broker
> level metrics for remote copy/fetch/failed-copy/failed-fetch.
>
> Are these metrics enough to monitor all the aspects of tiered storage?
>
> (eg)
> 1. Metrics to see the Tier Lag Status by number of pending
> segments/records.
> 2. Similar to log-start-offset and log-end-offset metrics.  Should we
> expose local-log-start-offset and highest-offset-uploaded-to-remote-storage
> as metric?
>
> Thanks,
> Kamal
>
> On Mon, Jul 24, 2023 at 2:08 PM Abhijeet Kumar  >
> wrote:
>
> > Hi All,
> >
> > I created KIP-930 for adding RemoteIndexCache stats and also to rename
> some
> > tiered storage metrics added as part of KIP-405
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics
> > >
> > to remove ambiguity.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics
> >
> > Feedback and suggestions are welcome.
> >
> > Regards,
> > Abhijeet.
> >
>


-- 
Abhijeet.


[VOTE] KIP-930: Tiered Storage Metrics

2023-07-25 Thread Abhijeet Kumar
Hi All,

I would like to start the vote for KIP-930 Tiered Storage Metrics.

The KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics

Regards
Abhijeet.


[jira] [Created] (KAFKA-15245) Improve Tiered Storage Metrics

2023-07-24 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-15245:
--

 Summary: Improve Tiered Storage Metrics
 Key: KAFKA-15245
 URL: https://issues.apache.org/jira/browse/KAFKA-15245
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhijeet Kumar
Assignee: Abhijeet Kumar


Rename existing tiered storage metrics to remove ambiguity and add metrics for 
the RemoteIndexCache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-930: Tiered Storage Metrics

2023-07-24 Thread Abhijeet Kumar
Hi All,

I created KIP-930 for adding RemoteIndexCache stats and also to rename some
tiered storage metrics added as part of KIP-405

to remove ambiguity.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics

Feedback and suggestions are welcome.

Regards,
Abhijeet.


[DISCUSS] KIP-930: Tiered Storage Metrics

2023-07-23 Thread Abhijeet Kumar
Hi All,

I created KIP-930 for adding RemoteIndexCache stats and also to rename some
tiered storage metrics added as part of KIP-405

to
remove ambiguity.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-930%3A+Tiered+Storage+Metrics

Feedback and suggestions are welcome.

Regards,
Abhijeet.


[jira] [Created] (KAFKA-15236) Rename Remote Storage metrics to remove ambiguity

2023-07-22 Thread Abhijeet Kumar (Jira)
Abhijeet Kumar created KAFKA-15236:
--

 Summary: Rename Remote Storage metrics to remove ambiguity
 Key: KAFKA-15236
 URL: https://issues.apache.org/jira/browse/KAFKA-15236
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhijeet Kumar
Assignee: Abhijeet Kumar


As per the Tiered Storage feature introduced in 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage],
 we added several metrics related to reads(from) and writes(to) for remote 
storage. The naming convention that was followed is confusing to the users.

For eg. in regular Kafka, BytesIn means bytes *_written_* to the log, and 
BytesOut means bytes *_read_* from the log. But with tiered storage, the 
concepts are reversed.
 * RemoteBytesIn means "Number of bytes *_read_* from remote storage per second"
 * RemoteBytesOut means "Number of bytes _*written*_ to remote storage per 
second"

We should rename the tiered storage related metrics to remove any ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14946) Kafka Node Shutting Down Automatically

2023-04-27 Thread Akshay Kumar (Jira)
Akshay Kumar created KAFKA-14946:


 Summary: Kafka Node Shutting Down Automatically
 Key: KAFKA-14946
 URL: https://issues.apache.org/jira/browse/KAFKA-14946
 Project: Kafka
  Issue Type: Bug
  Components: controller, kraft
Affects Versions: 3.3.1
Reporter: Akshay Kumar


* We are using the zookeeper less Kafka (kafka Kraft).
 * The cluster is having 3 nodes.
 * One of the nodes gets automatically shut down randomly.
 * Checked the logs but didn't get the exact reason.
 * Kafka version - 3.3.1
 * Attaching the log files. 
 * Time - 2023-04-21 16:28:23

*state-change.log -*
[https://drive.google.com/file/d/1eS-ShKlhGPsIJoybHndlhahJnucU8RWA/view?usp=share_link]
 
*server.log -*
[https://drive.google.com/file/d/1Ov5wrQIqx2AS4J7ppFeHJaDySsfsK588/view?usp=share_link]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


RE: Supported Kafka/Zookeeper Version with ELK 8.4.3

2022-10-28 Thread Kumar, Sudip
Hi Team,

We are still waiting for the reply. Please update we must know what version of 
Kafka is compatible with ELK 8.4 version.

Still, I can see no one replied on user and Dev community portal

[cid:image001.png@01D8EAFE.9B1F9280]


Thanks
Sudip


From: Kumar, Sudip
Sent: Monday, October 17, 2022 5:23 PM
To: us...@kafka.apache.org; dev@kafka.apache.org
Cc: Rajendra Bangal, Nikhil ; Verma, 
Harshit ; Verma, Deepak Kumar 
; Arkal, Dinesh Balaji 
; Saurabh, Shobhit 

Subject: Supported Kafka/Zookeeper Version with ELK 8.4.3
Importance: High

Hi Kafka Team,

Currently we are planning to upgrade ELK 7.16 to 8.4.3 version. In our 
ecosystem we are using Kafka as middleware which is ingesting data coming from 
different sources where publisher (Logstash shipper) publishing data in 
different Kafka Topics and subscriber (Logstash indexer) consuming the data.

We have an integration of ELK 7.16 with Kafka V2.5.1 and zookeeper 3.5.8. 
Please suggest if we upgrade on ELK 8.4.3 version which Kafka and Zookeeper 
version will be supported? Provide us handful documents.

Let me know if you any further questions.

Thanks
Sudip Kumar
Capgemini-India


This message contains information that may be privileged or confidential and is 
the property of the Capgemini Group. It is intended only for the person to whom 
it is addressed. If you are not the intended recipient, you are not authorized 
to read, print, retain, copy, disseminate, distribute, or use this message or 
any part thereof. If you receive this message in error, please notify the 
sender immediately and delete all copies of this message.


Supported Kafka/Zookeeper Version with ELK 8.4.3

2022-10-17 Thread Kumar, Sudip
Hi Kafka Team,

Currently we are planning to upgrade ELK 7.16 to 8.4.3 version. In our 
ecosystem we are using Kafka as middleware which is ingesting data coming from 
different sources where publisher (Logstash shipper) publishing data in 
different Kafka Topics and subscriber (Logstash indexer) consuming the data.

We have an integration of ELK 7.16 with Kafka V2.5.1 and zookeeper 3.5.8. 
Please suggest if we upgrade on ELK 8.4.3 version which Kafka and Zookeeper 
version will be supported? Provide us handful documents.

Let me know if you any further questions.

Thanks
Sudip Kumar
Capgemini-India


This message contains information that may be privileged or confidential and is 
the property of the Capgemini Group. It is intended only for the person to whom 
it is addressed. If you are not the intended recipient, you are not authorized 
to read, print, retain, copy, disseminate, distribute, or use this message or 
any part thereof. If you receive this message in error, please notify the 
sender immediately and delete all copies of this message.


[jira] [Resolved] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar resolved KAFKA-14220.
--
  Reviewer:   (was: Chris Egerton)
Resolution: Abandoned

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-12 Thread Pritam Kumar (Jira)
Pritam Kumar created KAFKA-14220:


 Summary: "partition-count" not getting updated after revocation of 
partitions in case of Incremental Co-operative rebalancing.
 Key: KAFKA-14220
 URL: https://issues.apache.org/jira/browse/KAFKA-14220
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.0.1
Reporter: Pritam Kumar


In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of 
"{*}ConsumerCoordinator{*}" class is being called before the "

{*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same 
class. As a result of which the *old assigned partition count* is getting 
updated again and again even after future rebalances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Committer: Chris Egerton

2022-07-25 Thread Kumud Kumar Srivatsava Tirupati
Congratulations Chris!

On Tue, 26 Jul, 2022, 7:11 AM deng ziming,  wrote:

> Congratulations Chris !
>
> --
> Ziming
>
> > On Jul 26, 2022, at 5:01 AM, Matthias J. Sax  wrote:
> >
> > Congrats! Well deserved!
> >
> > -Matthias
> >
> > On 7/25/22 1:08 PM, Bill Bejeck wrote:
> >> Congrats Chris!
> >> -Bill
> >> On Mon, Jul 25, 2022 at 3:58 PM Jorge Esteban Quilcate Otoya <
> >> quilcate.jo...@gmail.com> wrote:
> >>> Congratulations Chris!
> >>>
> >>> On Mon, 25 Jul 2022 at 20:27, Robin Moffatt  >
> >>> wrote:
> >>>
>  Congrats Chris!
> 
> 
>  --
> 
>  Robin Moffatt | Principal Developer Advocate | ro...@confluent.io |
> >>> @rmoff
> 
> 
>  On Mon, 25 Jul 2022 at 17:26, Mickael Maison 
> >>> wrote:
> 
> > Hi all,
> >
> > The PMC for Apache Kafka has invited Chris Egerton as a committer,
> and
> > we are excited to announce that he accepted!
> >
> > Chris has been contributing to Kafka since 2017. He has made over 80
> > commits mostly around Kafka Connect. His most notable contributions
> > include KIP-507: Securing Internal Connect REST Endpoints and
> KIP-618:
> > Exactly-Once Support for Source Connectors.
> >
> > He has been an active participant in discussions and reviews on the
> > mailing lists and on Github.
> >
> > Thanks for all of your contributions Chris. Congratulations!
> >
> > -- Mickael, on behalf of the Apache Kafka PMC
> >
> 
> >>>
>
>


[jira] [Resolved] (KAFKA-13926) Proposal to have "HasField" predicate for kafka connect

2022-06-03 Thread Kumud Kumar Srivatsava Tirupati (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kumud Kumar Srivatsava Tirupati resolved KAFKA-13926.
-
Resolution: Won't Fix

Dropping in favor of improving the existing SMTs as per the discussion.

https://lists.apache.org/thread/odbj7793plyz7xxyy6d71c3xn7zng49f

> Proposal to have "HasField" predicate for kafka connect
> ---
>
> Key: KAFKA-13926
> URL: https://issues.apache.org/jira/browse/KAFKA-13926
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Kumud Kumar Srivatsava Tirupati
>Assignee: Kumud Kumar Srivatsava Tirupati
>Priority: Major
>
> Hello,
> Today's connect predicates enables checks on the record metadata. However, 
> this can be limiting considering {*}many inbuilt and custom transformations 
> that we (community) use are more key/value centric{*}.
> Some use-cases this can solve:
>  * Data type conversions of certain pre-identified fields for records coming 
> across datasets only if those fields exist. [Ex: TimestampConverter can be 
> run only if the specified date field exists irrespective of the record 
> metadata]
>  * Skip running certain transform if a given field does/does not exist. A lot 
> of inbuilt transforms raise exceptions (Ex: InsertField transform if the 
> field already exists) thereby breaking the task. Giving this control enable 
> users to consciously configure for such cases.
>  * Even though some inbuilt transforms explicitly handle these cases, it 
> would still be an unnecessary pass-through loop.
>  * Considering each connector usually deals with multiple datasets (Even 100s 
> for a database CDC connector), metadata-centric predicate checking will be 
> somewhat limiting when we talk about such pre-identified custom metadata 
> fields in the records.
> I know some of these cases can be handled within the transforms itself but 
> that defeats the purpose of having predicates.
> We have built this predicate for us and it is found to be extremely helpful. 
> Please let me know your thoughts on the same so that I can raise a PR.
>  
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-845%3A+%27HasField%27+predicate+for+kafka+connect



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-845: HasField predicate for kafka connect

2022-06-03 Thread Kumud Kumar Srivatsava Tirupati
Hi Chris,
Thanks for your comment. I might have misunderstood the filter SMT. Makes
sense. Dropping this KIP for now. I will look at improving the
existing SMTs separately.

*---*
*Thanks and Regards,*
*Kumud Kumar Srivatsava Tirupati*


On Sat, 4 Jun 2022 at 03:57, Chris Egerton  wrote:

> Hi Kumud,
>
> I believe using the Filter SMT with a predicate would also cause the
> record to be dropped from the pipeline. AFAIK there is no way to skip the
> entire transformation chain besides applying a predicate to every SMT in
> the chain. If there's a reasonable use case for wanting to do that based on
> the presence of a field in a record that wouldn't be addressed by updating
> the SMTs that come with Connect to be friendlier when the field they're
> acting on does/does not exist, then we definitely could continue to pursue
> the HasField predicate.
>
> Cheers,
>
> Chris
>
> On Fri, Jun 3, 2022 at 10:52 AM Kumud Kumar Srivatsava Tirupati <
> kumudkumartirup...@gmail.com> wrote:
>
>> Hi Chris,
>> Thanks for the explanation. This clears my thoughts.
>> I can now agree that the concerns are totally different for SMTs and
>> predicates.
>> I also do agree with you that this might encourage SMTs to be poorly
>> designed.
>>
>> Do you see this worth considering just for the filter use case?
>> ['errors.tolerance' would make it drop from the pipeline instead of
>> skipping from the transformation chain]. Maybe by further limiting its
>> functionality to not support nested fields. I can now discard the other
>> use-cases I mentioned in the KIP in favor of "Improving the corresponding
>> SMTs".
>>
>> *---*
>> *Thanks and Regards,*
>> *Kumud Kumar Srivatsava Tirupati*
>> *Ph : +91-8686073938*
>>
>> On Fri, 3 Jun 2022 at 18:04, Chris Egerton 
>> wrote:
>>
>>> Hi Kumud,
>>>
>>> Responses inline.
>>>
>>> > But, I still believe this logic of predicate checking shouldn't be
>>> made a
>>> part of each individual SMT. After all, that is what the predicates are
>>> for
>>> right?
>>>
>>> I don't quite agree. I think the benefit of predicates is that they can
>>> allow you to selectively apply a transformation based on conditions that
>>> don't directly relate to the behavior of the transformation. For example,
>>> "mask field 'ID' if the 'SSN' header is present". The use cases identified
>>> in the KIP don't quite align with this; instead, they're more about working
>>> around undesirable behavior in transformations that makes them difficult to
>>> use in some pipelines. If an SMT is difficult to use, it's better to just
>>> improve the SMT directly instead of adding a layer of indirection on top of
>>> it.
>>>
>>> > `HasField` predicate is something very similar but is more powerful
>>> because
>>> it allows users to skip transformations at SMT level or drop them from
>>> the
>>> transformation chain altogether using the current `
>>> org.apache.kafka.connect.transforms.Filter`. So, the use cases are not
>>> just
>>> limited to skipping some SMTs.
>>>
>>> This use case should be included in the KIP if it's a significant
>>> motivating factor for adding this predicate. But it's also possible to drop
>>> entire records from a pipeline today by setting the 'errors.tolerance'
>>> property to 'all' and allowing all records that cause exceptions to be
>>> thrown in the transformation chain to be skipped/written to a DLQ/logged.
>>>
>>> Ultimately, I'm worried that we're encouraging bad habits with SMT
>>> developers by establishing a precedent where core logic that most users
>>> would expect to be present is left out in favor of a predicate, which makes
>>> things harder to configure and can confound new users by adding a layer of
>>> complexity in the form of another pluggable interface.
>>>
>>> Cheers,
>>>
>>> Chris
>>>
>>> On Thu, Jun 2, 2022 at 4:24 AM Kumud Kumar Srivatsava Tirupati <
>>> kumudkumartirup...@gmail.com> wrote:
>>>
>>>> Hi Chris,
>>>> Thanks for your comments.
>>>>
>>>> I am totally aligned with your comment on nested field names which
>>>> include
>>>> dots. I will incorporate the same based on how the KIP-821 discussion
>>>> goes
>>>> (maybe this parser could be a utility that can be reused in other areas
>>>> as
>>>

Re: [DISCUSS] KIP-845: HasField predicate for kafka connect

2022-06-03 Thread Kumud Kumar Srivatsava Tirupati
Hi Chris,
Thanks for the explanation. This clears my thoughts.
I can now agree that the concerns are totally different for SMTs and
predicates.
I also do agree with you that this might encourage SMTs to be poorly
designed.

Do you see this worth considering just for the filter use case?
['errors.tolerance' would make it drop from the pipeline instead of
skipping from the transformation chain]. Maybe by further limiting its
functionality to not support nested fields. I can now discard the other
use-cases I mentioned in the KIP in favor of "Improving the corresponding
SMTs".

*---*
*Thanks and Regards,*
*Kumud Kumar Srivatsava Tirupati*
*Ph : +91-8686073938*

On Fri, 3 Jun 2022 at 18:04, Chris Egerton  wrote:

> Hi Kumud,
>
> Responses inline.
>
> > But, I still believe this logic of predicate checking shouldn't be made a
> part of each individual SMT. After all, that is what the predicates are for
> right?
>
> I don't quite agree. I think the benefit of predicates is that they can
> allow you to selectively apply a transformation based on conditions that
> don't directly relate to the behavior of the transformation. For example,
> "mask field 'ID' if the 'SSN' header is present". The use cases identified
> in the KIP don't quite align with this; instead, they're more about working
> around undesirable behavior in transformations that makes them difficult to
> use in some pipelines. If an SMT is difficult to use, it's better to just
> improve the SMT directly instead of adding a layer of indirection on top of
> it.
>
> > `HasField` predicate is something very similar but is more powerful
> because
> it allows users to skip transformations at SMT level or drop them from the
> transformation chain altogether using the current `
> org.apache.kafka.connect.transforms.Filter`. So, the use cases are not just
> limited to skipping some SMTs.
>
> This use case should be included in the KIP if it's a significant
> motivating factor for adding this predicate. But it's also possible to drop
> entire records from a pipeline today by setting the 'errors.tolerance'
> property to 'all' and allowing all records that cause exceptions to be
> thrown in the transformation chain to be skipped/written to a DLQ/logged.
>
> Ultimately, I'm worried that we're encouraging bad habits with SMT
> developers by establishing a precedent where core logic that most users
> would expect to be present is left out in favor of a predicate, which makes
> things harder to configure and can confound new users by adding a layer of
> complexity in the form of another pluggable interface.
>
> Cheers,
>
> Chris
>
> On Thu, Jun 2, 2022 at 4:24 AM Kumud Kumar Srivatsava Tirupati <
> kumudkumartirup...@gmail.com> wrote:
>
>> Hi Chris,
>> Thanks for your comments.
>>
>> I am totally aligned with your comment on nested field names which include
>> dots. I will incorporate the same based on how the KIP-821 discussion goes
>> (maybe this parser could be a utility that can be reused in other areas as
>> well).
>>
>> But, I still believe this logic of predicate checking shouldn't be made a
>> part of each individual SMT. After all, that is what the predicates are
>> for
>> right?
>> If you see, the current predicates that we have are:
>> org.apache.kafka.connect.transforms.predicates.TopicNameMatches
>> org.apache.kafka.connect.transforms.predicates.HasHeaderKey
>> org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
>> They only allow you filter/transform based on the metadata of the record.
>>
>> `HasField` predicate is something very similar but is more powerful
>> because
>> it allows users to skip transformations at SMT level or drop them from the
>> transformation chain altogether using the current `
>> org.apache.kafka.connect.transforms.Filter`. So, the use cases are not
>> just
>> limited to skipping some SMTs.
>> If this makes sense, I should probably add this and give an example in the
>> KIP as well.
>>
>> *---*
>> *Thanks and Regards,*
>> *Kumud Kumar Srivatsava Tirupati*
>>
>>
>> On Wed, 1 Jun 2022 at 07:09, Chris Egerton 
>> wrote:
>>
>> > Hi Kumud,
>> >
>> > Thanks for the KIP. I'm a little bit skeptical about the necessity for
>> this
>> > predicate but I think we may be able to satisfy your requirements with a
>> > slightly different approach. The motivation section deals largely with
>> > skipping the invocation of SMTs that expect a certain field to be
>> present
>> > in a record, and will fail if it is not present. This seems like a
>> > reasonable use case; even when 

Re: [DISCUSS] KIP-845: HasField predicate for kafka connect

2022-06-02 Thread Kumud Kumar Srivatsava Tirupati
Hi Chris,
Thanks for your comments.

I am totally aligned with your comment on nested field names which include
dots. I will incorporate the same based on how the KIP-821 discussion goes
(maybe this parser could be a utility that can be reused in other areas as
well).

But, I still believe this logic of predicate checking shouldn't be made a
part of each individual SMT. After all, that is what the predicates are for
right?
If you see, the current predicates that we have are:
org.apache.kafka.connect.transforms.predicates.TopicNameMatches
org.apache.kafka.connect.transforms.predicates.HasHeaderKey
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
They only allow you filter/transform based on the metadata of the record.

`HasField` predicate is something very similar but is more powerful because
it allows users to skip transformations at SMT level or drop them from the
transformation chain altogether using the current `
org.apache.kafka.connect.transforms.Filter`. So, the use cases are not just
limited to skipping some SMTs.
If this makes sense, I should probably add this and give an example in the
KIP as well.

*---*
*Thanks and Regards,*
*Kumud Kumar Srivatsava Tirupati*


On Wed, 1 Jun 2022 at 07:09, Chris Egerton  wrote:

> Hi Kumud,
>
> Thanks for the KIP. I'm a little bit skeptical about the necessity for this
> predicate but I think we may be able to satisfy your requirements with a
> slightly different approach. The motivation section deals largely with
> skipping the invocation of SMTs that expect a certain field to be present
> in a record, and will fail if it is not present. This seems like a
> reasonable use case; even when your data has a fixed schema, optional
> fields are still possible, and preventing SMTs from being used on optional
> fields seems unnecessarily restrictive. In fact, it seems like such a
> reasonable use case that I wonder if it'd be worth investing in new
> SMT-level properties to handle this case, instead of requiring users to
> configure a predicate separately alongside them? Something like an
> "on.field.missing" property with options of "skip", "fail", and/or "warn"
> could do the trick.
>
> It's also worth noting that the proposed syntax for the "field.path"
> property in the HasField predicate would make it impossible to refer to
> field names that have dots in them. It hasn't been finalized yet but if we
> do end up going this route, we should probably use the same syntax that's
> agreed upon for KIP-821 [1].
>
> [1] -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures
>
> Cheers,
>
> Chris
>
> On Thu, May 26, 2022 at 5:28 AM Sagar  wrote:
>
> > Hi Kumud,
> >
> > Thanks for that. I don't have any other comments at this point on the
> KIP.
> > LGTM overall.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, May 25, 2022 at 5:14 PM Sagar  wrote:
> >
> > > Thanks for the KIP Kumud.
> > >
> > > Can you please add a couple of examples on how this would behave with
> > > different combinations. I think that way it would be easier to
> > understand.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Wed, May 25, 2022 at 4:59 PM Kumud Kumar Srivatsava Tirupati <
> > > kumudkumartirup...@gmail.com> wrote:
> > >
> > >> Hi all,
> > >> I have written a KIP for having a new HasField predicate for kafka
> > connect
> > >> transforms and would like to start a discussion. Please share your
> > >> thoughts
> > >> on the same.
> > >>
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-845%3A+%27HasField%27+predicate+for+kafka+connect
> > >>
> > >> *---*
> > >> *Thanks and Regards,*
> > >> *Kumud Kumar Srivatsava Tirupati*
> > >>
> > >
> >
>


Re: [DISCUSS] KIP-845: HasField predicate for kafka connect

2022-05-25 Thread Kumud Kumar Srivatsava Tirupati
Hi Sagar,
Added the examples to the KIP wiki as suggested.

*---*
*Thanks and Regards,*
*Kumud Kumar Srivatsava Tirupati*


On Wed, 25 May 2022 at 17:14, Sagar  wrote:

> Thanks for the KIP Kumud.
>
> Can you please add a couple of examples on how this would behave with
> different combinations. I think that way it would be easier to understand.
>
> Thanks!
> Sagar.
>
> On Wed, May 25, 2022 at 4:59 PM Kumud Kumar Srivatsava Tirupati <
> kumudkumartirup...@gmail.com> wrote:
>
>> Hi all,
>> I have written a KIP for having a new HasField predicate for kafka connect
>> transforms and would like to start a discussion. Please share your
>> thoughts
>> on the same.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-845%3A+%27HasField%27+predicate+for+kafka+connect
>>
>> *---*
>> *Thanks and Regards,*
>> *Kumud Kumar Srivatsava Tirupati*
>>
>


[DISCUSS] KIP-845: HasField predicate for kafka connect

2022-05-25 Thread Kumud Kumar Srivatsava Tirupati
Hi all,
I have written a KIP for having a new HasField predicate for kafka connect
transforms and would like to start a discussion. Please share your thoughts
on the same.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-845%3A+%27HasField%27+predicate+for+kafka+connect

*---*
*Thanks and Regards,*
*Kumud Kumar Srivatsava Tirupati*


Requesting permissions to contribute to Kafka

2022-05-22 Thread Kumud Kumar Srivatsava Tirupati
Hi team,
I am planning to work on https://issues.apache.org/jira/browse/KAFKA-13926
and wanted to create a KIP for the same. Requesting permissions on the
below wiki and jira id for contributing.

Jira ID: kumudkumartirupati
Wiki ID: kumudkumartirup...@gmail.com

*---*
*Thanks and Regards,*
*Kumud Kumar Srivatsava Tirupati*


[jira] [Created] (KAFKA-13926) Proposal to have "HasField" predicate for kafka connect

2022-05-22 Thread Kumud Kumar Srivatsava Tirupati (Jira)
Kumud Kumar Srivatsava Tirupati created KAFKA-13926:
---

 Summary: Proposal to have "HasField" predicate for kafka connect
 Key: KAFKA-13926
 URL: https://issues.apache.org/jira/browse/KAFKA-13926
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Kumud Kumar Srivatsava Tirupati


Hello,

Today's connect predicates enables checks on the record metadata. However, this 
can be limiting considering {*}many inbuilt and custom transformations that we 
(community) use are more key/value centric{*}.

Some use-cases this can solve:
 * Data type conversions of certain pre-identified fields for records coming 
across datasets only if those fields exist. [Ex: TimestampConverter can be run 
only if the specified date field exists irrespective of the record metadata]
 * Skip running certain transform if a given field does/does not exist. A lot 
of inbuilt transforms raise exceptions (Ex: InsertField transform if the field 
already exists) thereby breaking the task. Giving this control enable users to 
consciously configure for such cases.
 * Even though some inbuilt transforms explicitly handle these cases, it would 
still be an unnecessary pass-through loop.
 * Considering each connector usually deals with multiple datasets (Even 100s 
for a database CDC connector), metadata-centric predicate checking will be 
somewhat limiting when we talk about such pre-identified custom metadata fields 
in the records.

I know some of these cases can be handled within the transforms itself but that 
defeats the purpose of having predicates.

We have built this predicate for us and it is found to be extremely helpful. 
Please let me know your thoughts on the same so that I can raise a PR.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13575) Unwanted cleanup since log.retention.hours=-1 does not work

2022-01-02 Thread Alok Kumar Singh (Jira)
Alok Kumar Singh created KAFKA-13575:


 Summary: Unwanted cleanup since log.retention.hours=-1 does not 
work
 Key: KAFKA-13575
 URL: https://issues.apache.org/jira/browse/KAFKA-13575
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.6.0
 Environment: AWS, Kubernetes cluster, Kafka run as strimzi pods.
Reporter: Alok Kumar Singh
 Attachments: Screenshot 2022-01-02 at 4.22.50 PM.png, Screenshot 
2022-01-02 at 4.30.38 PM.png

Use case: We have a use case in which we never want topics data to get deleted. 
We used `log.retention.hours=-1` to keep the topic's retention as infinite.

But still Kafka cleanup got triggered and it cleaned our topic's data moving 
our oldest offset from 0 to higher value. Below is the log from Kafka using 
default retention of 7days=604080ms and moving the offsets due to deleted 
segment. Logs are filled with messages "due to {*}retention time 60480ms 
breach{*}"

*Logs* of the Kafka Pod that has the cleanup traces:

 
{code:java}
2021-12-21 15:28:20,891 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[0,2214146,4890534,7645992,10394781,13149705,15798698,18084408,20638290,23202704,25760900,28207227,30796134,33437632,35763142,38247977,40861013,43473617,46086956,48469054,50776433,53050704,55299660,57529943,59743683,61984715,64224317,66458910,68670877,70886848,73080554,74872031,76656553,78573653,80412933,82457759,84854900,89950485,94263626,98668311,103762764,107797683,111809208,113716544,115406491,117052453,119304884,121765245,123536361,125364537,127191439,129028759,130902041,132719344,134787234,139027470,143932296,147888638,150492879,152410799,154338596,156416190,160672948,165054157,169273069,171824978,173847615,175602822,177765267,179701522,181622192,183511328,185382862,187200966,188998652,193646877,198500489,203354616,208211378,210992397,212806790,214599592,216456221,218231836,220014080,221812337,223574882,225341294,227151134,228894227,231000663,23280,234554788,236304441,238012477,239523793,241164599,242938200,244945345,246534386,248345745,249972929,251761034,253511669,255180052,256960087,258715796,260382401,262123766,263900146,265585895,267349067,269106101,270936370,272601164,274416555,276154378,277854987,279656300,281487544,283272481,285126550,286886989,288671958,290389171,292132610,293756088,295527031,297230008,298992929,300641108,302415673,304138766,305900851,307542789,309368805,311075952,312729042,314516564,316294429,318006961,319760277,321426916,323177673,324845934,326606764,328385019,330117277,331930761,333729477,335525038,337253962,339046341,340756170,342402991,344019861,345789681,347575987,349276417,350912980,352675272,35307,356157225,357960082,359616551,361418905,363132123,364932894,366638881,368368450]
 due to retention time 60480ms breach (kafka.log.Log) [kafka-scheduler-2]
2021-12-23 02:28:18,124 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[370099688] due to retention time 60480ms breach (kafka.log.Log) 
[kafka-scheduler-4]
2021-12-24 15:03:18,125 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[371913024] due to retention time 60480ms breach (kafka.log.Log) 
[kafka-scheduler-1]
2021-12-26 13:33:18,124 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[373561655] due to retention time 60480ms breach (kafka.log.Log) 
[kafka-scheduler-2]
2021-12-28 06:18:18,124 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[375346988] due to retention time 60480ms breach (kafka.log.Log) 
[kafka-scheduler-9]
2021-12-29 20:18:18,125 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[377066152] due to retention time 60480ms breach (kafka.log.Log) 
[kafka-scheduler-2]
2021-12-31 12:18:18,124 INFO [Log partition=ts.praccomm.patientsms-0, 
dir=/var/lib/kafka/data/kafka-log2] Found deletable segments with base offsets 
[378821599] due to retention time 60480ms breach (kafka.log.Log) 
[kafka-scheduler-2] {code}
 

*Listing out Configurations to prove we are setting the right values*

{*}Broker Configurations:{*}{*}{*}
{code:java}
$ ./bin/kafka-configs.sh --command-config ./bin/client-ssl-auth.properties 
--bootstrap-server XXX:9094 --entity-type brokers --entity-name 2 --describe 
--all | grep retention
(org.apache.kafka.clients.admin.AdminClientConfig)
  log.cleaner.delete.retention.ms=8640 sensitive=false 
synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=8640}
  log.retention.hours=-1

Re: [ANNOUNCE] New Kafka PMC member: David Jacot

2021-12-19 Thread Rankesh Kumar
Congratulations, David!

Best regards,
Rankesh Kumar
Partner Solutions Engineer
+91 (701)913-0147
Follow us:  Blog • Slack • Twitter • YouTube

> On 18-Dec-2021, at 4:38 AM, Gwen Shapira  wrote:
> 
> Hi everyone,
> 
> David Jacot has been an Apache Kafka committer since Oct 2020 and has been 
> contributing to the community consistently this entire time - especially 
> notable the fact that he reviewed around 150 PRs in the last year. It is my 
> pleasure to announce that David agreed to join the Kafka PMC.
> 
> Congratulations, David!
> 
> Gwen Shapira, on behalf of Apache Kafka PMC



Kafka SSL CA Change Issue

2021-10-08 Thread Naresh R Kumar
Hi Team,

we need some help regarding ca certificate authority change in kafka . 
Currently we are connecting in kafka using ssl implementation.

kafka version used is 1.1.1

below is server.properties

listeners=INT://$PVT_HOST_NAME:9094,EXT://$PVT_HOST_NAME:9092
advertised.listeners=INT://$PVT_HOST_NAME:9094,EXT://$PUB_HOST_NAME:9092
ssl.keystore.location=$SSL_DIR/broker.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=$SSL_DIR/broker.truststore.jks
SUPER_USERS_CONFIG=$SUPER_USERS_CONFIG"User:CN=br$c.broker.kafka-$CLUSTER_NAME-$ENV,OU=broker,O=server
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,EXT:SSL,INT:PLAINTEXT
inter.broker.listener.name=INT

options tried :-
1. generating new certificates and updating  into existing  keystore and 
truststore  (we are observing that client is able to connect using one ca is 
getting accepted old ca or new ca)
2. only private keys in keystore and root certs in truststore (we are observing 
that client is able to connect using one ca is getting accepted old ca or new 
ca)
3. muliple keystore and truststore files using comma separated format  (client 
connect is not working at all)
eg ..
  
ssl.keystore.location=$SSL_DIR/broker-oldca.keystore.jks,$SSL_DIR/broker-newca.keystore.jks
  
ssl.truststore.location=$SSL_DIR/broker-oldca.truststore.jks,broker-newca.truststore.jks

can anyone please help us on this, as this change in authority will cause 
outage and connection issues with existing clients.

Current Result : only one certificate is working , eaither the old one  or new 
one
Expected Result : both the certificates(keystore & trust store ) should work, 
old one & new one.

Validation process : After updating the broker certificates we are trying to 
connect to broker ( from kafka tool) by using consumer certificates.



Thanks & Regards
R . Naresh Kumar

LCG-DF DevOps Engineer​

[cid:354c3de2-833f-4115-ad41-699919639207]



[jira] [Created] (KAFKA-13334) ERROR Failed to clean up log for __consumer_offsets

2021-09-29 Thread Udaya Kumar (Jira)
Udaya Kumar created KAFKA-13334:
---

 Summary: ERROR Failed to clean up log for __consumer_offsets
 Key: KAFKA-13334
 URL: https://issues.apache.org/jira/browse/KAFKA-13334
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.8.1
 Environment: Windows Service application and Windows 10 operating 
system
Reporter: Udaya Kumar
 Attachments: server.log.2021-09-23-17

[2021-09-23 17:10:29,126] ERROR Failed to clean up log for 
__consumer_offsets-38 in dir C:\Kafka\server_logData due to IOException 
(kafka.server.LogDirFailureChannel)[2021-09-23 17:10:29,126] ERROR Failed to 
clean up log for __consumer_offsets-38 in dir C:\Kafka\server_logData due to 
IOException 
(kafka.server.LogDirFailureChannel)java.nio.file.FileSystemException: 
C:\Kafka\server_logData\__consumer_offsets-38\.timeindex.cleaned
 -> 
C:\Kafka\server_logData\__consumer_offsets-38\.timeindex.swap:The
 process cannot access the file because it is being used by another process at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
 at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:401) at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


FW: Abe to access Kafka Services inside but unable ... -

2021-09-14 Thread Sathish Kumar Natarajan, Integra-PDY, IN

Hi Team,

Kafka Zookeeper & Kafka Server services are configured in a Linux box. Inside 
the linux box we are able to access both the server. While we try to access 
from our local machine it not connected.

We try the following document to configure Kafka in SASL_PLAINTEXT protocal. We 
re facing problem to start Kaka service.

https://docs.vmware.com/en/VMware-Smart-Assurance/10.1.0/deployment-scenarios/GUID-3E473EC3-732A-4963-81BD-13BCCD3AC700.html

We are struck with the third step in the above document link.

Please find the below error.
[cid:image004.png@01D7A8BF.2CC46F90]

Please assist to close the issue ASAP.


Thanks & Regards,
Ukendiran
Senior Developer, Products
Integra Software Services

P : +91-413-4212124 Ext: 369  |  M : +91 9600830086
A : 100 Feet Road (ECR), Pakkamudiyanpet, Pondicherry 605 008, India
W : 
www.integra.co.in
  |  E : 
ukendiran.dhanaseka...@integra.co.in
Powering Content Transformation
India | USA | UK | Japan | Canada | Germany

[cid:image001.png@01D48961.790041C0][cid:image002.png@01D48961.790041C0][cid:image003.png@01D48961.790041C0]




Re: [ANNOUNCE] New Kafka PMC Member: Konstantine Karantasis

2021-06-22 Thread Rankesh Kumar
Yay! Congratulations, KK!

Best regards,
Rankesh Kumar
Partner Solutions Engineer
+91 (701)913-0147
Follow us:  Blog • Slack • Twitter • YouTube

> On 21-Jun-2021, at 8:58 PM, Mickael Maison  wrote:
> 
> Hi,
> 
> It's my pleasure to announce that Konstantine Karantasis is now a
> member of the Kafka PMC.
> 
> Konstantine has been a Kafka committer since Feb 2020. He has remained
> active in the community since becoming a committer.
> 
> Congratulations Konstantine!
> 
> Mickael, on behalf of the Apache Kafka PMC



Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch

2021-04-17 Thread Rankesh Kumar
Congratulations, Randall!
Best regards,
Rankesh Kumar
Partner Solutions Engineer
+91 (701)913-0147
Follow us:  Blog • Slack • Twitter • YouTube

> On 17-Apr-2021, at 1:41 PM, Tom Bentley  wrote:
> 
> Congratulations Randall!
> 
> 
> 
> On Sat, Apr 17, 2021 at 7:36 AM feyman2009 
> wrote:
> 
>> Congratulations Randall!
>> 
>> Haoran
>> --
>> 发件人:Luke Chen 
>> 发送时间:2021年4月17日(星期六) 12:05
>> 收件人:Kafka Users 
>> 抄 送:dev 
>> 主 题:Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch
>> 
>> Congratulations Randall!
>> 
>> Luke
>> 
>> Bill Bejeck  於 2021年4月17日 週六 上午11:33 寫道:
>> 
>>> Congratulations Randall!
>>> 
>>> -Bill
>>> 
>>> On Fri, Apr 16, 2021 at 11:10 PM lobo xu  wrote:
>>> 
>>>> Congrats Randall
>>>> 
>>> 
>> 
>> 



[jira] [Created] (KAFKA-12259) Consolidated Status endpoint return 500 whe config provider can't find a config

2021-01-29 Thread Magesh kumar Nandakumar (Jira)
Magesh kumar Nandakumar created KAFKA-12259:
---

 Summary: Consolidated Status endpoint return 500 whe config 
provider can't find a config
 Key: KAFKA-12259
 URL: https://issues.apache.org/jira/browse/KAFKA-12259
 Project: Kafka
  Issue Type: Bug
Reporter: Magesh kumar Nandakumar


The consolidated connectors endpoint connectors?expand=status return a `500` 
error when any of the connector's has an exception from the config provider.  
[https://github.com/apache/kafka/blob/e9edf104866822d9e6c3b637ffbf338767b5bf27/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L287]

 The status endpoint doesn't need the complete list of configs and just 
requires the Connector Class to infer if it's a `Source` or a `Sink`. The 
failed connector status should be returned as `Failed` instead of the endpoint 
returning a 500.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Will Cache will get invalidated if i read lots of messages using random offsets and partitions

2020-11-06 Thread Rupesh Kumar
Hi Team,

I understand that Kafka is meant for sequential access, but I have use case of 
accessing the messages from Kafka based on random offsets and partitions.

For Example

There is topic called “topic-A” some consumers are listening from this topic, 
it may happen that processing of these messages gets failed inside consumer so 
in that case we will have to reprocess those messages.
We store the offset and partition details for the failed messages.

Another consumer will process these failed messages and put again to the topic, 
so to reprocess those messages again we will be fetching those messages from 
respective
topics based on offsets and partitions. So in this case we will have lots of 
random offsets to access from different topics (our application should support 
1000 messages/second including all topics).


So now my doubt comes here
 Will accessing messages using random offsets and partitions invalidate the 
cache for the topic that was used for sequential access (other consumers was 
reading sequentially).

 I want to understand the impact of this random offset, partition access on 
kafka,


  1.  Will it slow down Kafka ?
  2.  Will it read messages from cache or from disk ?
  3.  If messages is not in cache then will it load from disk and store in 
cache … in that case what will happen to the existing data that was in cache 
and was being used for sequential access ?

I went through Kafka documentation but couldn’t  answer to all my questions.
Need your help here.

Please let me know if my question is not clear or you need more info.

Regards
Rupesh



Re: [ANNOUNCE] New committer: Chia-Ping Tsai

2020-10-19 Thread Rankesh Kumar
Many congratulations, Chia-Ping!

Best regards,
Rankesh

> On 20-Oct-2020, at 6:45 AM, Luke Chen  wrote:
> 
> Congratulations! Chia-Ping大大!
> Well deserved!
> 
> Luke
> 
> On Tue, Oct 20, 2020 at 2:30 AM Mickael Maison 
> wrote:
> 
>> Congrats Chia-Ping!
>> 
>> On Mon, Oct 19, 2020 at 8:29 PM Ismael Juma  wrote:
>>> 
>>> Congratulations Chia-Ping!
>>> 
>>> Ismael
>>> 
>>> On Mon, Oct 19, 2020 at 10:25 AM Guozhang Wang 
>> wrote:
>>> 
 Hello all,
 
 I'm happy to announce that Chia-Ping Tsai has accepted his invitation
>> to
 become an Apache Kafka committer.
 
 Chia-Ping has been contributing to Kafka since March 2018 and has made
>> 74
 commits:
 
 https://github.com/apache/kafka/commits?author=chia7712
 
 He's also authored several major improvements, participated in the KIP
 discussion and PR reviews as well. His major feature development
>> includes:
 
 * KAFKA-9654: Epoch based ReplicaAlterLogDirsThread creation.
 * KAFKA-8334: Spiky offsetCommit latency due to lock contention.
 * KIP-331: Add default implementation to close() and configure() for
>> serde
 * KIP-367: Introduce close(Duration) to Producer and AdminClients
 * KIP-338: Support to exclude the internal topics in kafka-topics.sh
 command
 
 In addition, Chia-Ping has demonstrated his great diligence fixing test
 failures, his impressive engineering attitude and taste in fixing
>> tricky
 bugs while keeping simple designs.
 
 Please join me to congratulate Chia-Ping for all the contributions!
 
 
 -- Guozhang
 
>> 



Re: [ANNOUNCE] New committer: A. Sophie Blee-Goldman

2020-10-19 Thread Rankesh Kumar
Many congratulations, Sophie.

Best regards,
Rankesh


> On 20-Oct-2020, at 12:34 AM, Gwen Shapira  wrote:
> 
> Congratulations, Sophie!
> 
> On Mon, Oct 19, 2020 at 9:41 AM Matthias J. Sax  wrote:
>> 
>> Hi all,
>> 
>> I am excited to announce that A. Sophie Blee-Goldman has accepted her
>> invitation to become an Apache Kafka committer.
>> 
>> Sophie is actively contributing to Kafka since Feb 2019 and has
>> accumulated 140 commits. She authored 4 KIPs in the lead
>> 
>> - KIP-453: Add close() method to RocksDBConfigSetter
>> - KIP-445: In-memory Session Store
>> - KIP-428: Add in-memory window store
>> - KIP-613: Add end-to-end latency metrics to Streams
>> 
>> and helped to implement two critical KIPs, 429 (incremental rebalancing)
>> and 441 (smooth auto-scaling; not just implementation but also design).
>> 
>> In addition, she participates in basically every Kafka Streams related
>> KIP discussion, reviewed 142 PRs, and is active on the user mailing list.
>> 
>> Thanks for all the contributions, Sophie!
>> 
>> 
>> Please join me to congratulate her!
>> -Matthias
>> 
> 
> 
> -- 
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog



Re: [ANNOUNCE] New committer: David Jacot

2020-10-16 Thread Rankesh Kumar
Many congratulations,  David. It is awesome.

Best regards,
Rankesh

> On 16-Oct-2020, at 9:51 PM, Mickael Maison  wrote:
> 
> Congratulations David!
> 
> On Fri, Oct 16, 2020 at 6:05 PM Bill Bejeck  wrote:
>> 
>> Congrats David! Well deserved.
>> 
>> -Bill
>> 
>> On Fri, Oct 16, 2020 at 12:01 PM Gwen Shapira  wrote:
>> 
>>> The PMC for Apache Kafka has invited David Jacot as a committer, and
>>> we are excited to say that he accepted!
>>> 
>>> David Jacot has been contributing to Apache Kafka since July 2015 (!)
>>> and has been very active since August 2019. He contributed several
>>> notable KIPs:
>>> 
>>> KIP-511: Collect and Expose Client Name and Version in Brokers
>>> KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies:
>>> KIP-570: Add leader epoch in StopReplicaReques
>>> KIP-599: Throttle Create Topic, Create Partition and Delete Topic
>>> Operations
>>> KIP-496 Added an API for the deletion of consumer offsets
>>> 
>>> In addition, David Jacot reviewed many community contributions and
>>> showed great technical and architectural taste. Great reviews are hard
>>> and often thankless work - but this is what makes Kafka a great
>>> product and helps us grow our community.
>>> 
>>> Thanks for all the contributions, David! Looking forward to more
>>> collaboration in the Apache Kafka community.
>>> 
>>> --
>>> Gwen Shapira
>>> 



Re: Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #70

2020-09-18 Thread niranjan kumar
unsubscribe

On Sat, Sep 19, 2020 at 1:07 AM Apache Jenkins Server <
jenk...@builds.apache.org> wrote:

> See <
> https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk8/70/display/redirect?page=changes
> >
>
> Changes:
>
> [github] MINOR: Remove unused variable (#9303)
>
>
> --
> [...truncated 6.53 MB...]
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos
> enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos
> enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos
> enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos
> enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowForUnknownTopic[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowForUnknownTopic[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled =
> false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled =
> false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false]
> STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldPopulateGlobalStore[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldPopulateGlobalStore[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled
> = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled
> = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false]
> STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false]
> PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldRespectTaskIdling[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldRespectTaskIdling[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldReturnAllStores[Eos enabled = false] STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldReturnAllStores[Eos enabled = false] PASSED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false]
> STARTED
>
> org.apache.kafka.streams.TopologyTestDriverTest >
> 

Discussion on KIP-670 : Add ConsumerGroupCommand to delete static members

2020-09-03 Thread Sandeep Kumar
Hi All,

I am new to the Kafka contribution community. I have picked up a jira
ticket https://issues.apache.org/jira/browse/KAFKA-9440 which requires KIP.

I have submitted KIP for it
https://cwiki.apache.org/confluence/display/KAFKA/KIP-670%3A+Add+ConsumerGroupCommand+to+delete+static+members

I am proposing that we add a new option (--remove-members) from consumer
group via CLI.

I'd really appreciate your feedback on the proposal.

Thanks and Regards,
Sandeep


Re: Need contributor access to Kafka Improvement Proposals

2020-09-03 Thread Sandeep Kumar
Hi Matthias,

Can you please grant me contributor access to create KIP ?

 UserId : sndp2693
 EmailId : sndp2...@gmail.com


Regards,
Sandeep


On Thu, Sep 3, 2020 at 1:26 PM Sandeep Kumar  wrote:

> HI,
>
> Can you please grant me access to create KIP?
>
> Thanks,
> Sandeep
>


Need contributor access to Kafka Improvement Proposals

2020-09-03 Thread Sandeep Kumar
HI,

Can you please grant me access to create KIP?

Thanks,
Sandeep


Need contributor access to Jira

2020-09-01 Thread Sandeep Kumar
Hi Team ,

Please guide me how I can request for the contributor access for jira so
that I can assign some jira tickets to myself and contribute to the kafka
community.

Username: sndp2693
Email: sndp2...@gmail.com
Full Name: Sandeep Kumar

Thanks and Regards
Sandeep


Requesting permission to create KIP

2020-07-15 Thread Nikhil kumar
Hi kafka-devs,
I am planning to add support for sources which can't give out changed
records in kafka connect.
Please, provide me permission to create KIP.

Wiki id - nikhil578
Email id - nikhilkumar...@gmail.com

Thanks,
Nikhil


NODE JS INTERACTION WITH KAFKA

2020-07-01 Thread Ankit Kumar
Hi,
I have implemented Kakfa with node js using node module node-kafka.
node version : 10.17
node-kafka : 5.0.0

I am using consumer groups and default producers.
The issue that I am facing is that the consumers are not able to consume
the messages randomly.
Any help would be appreciated.

Thank you,
-- 
Ankit Kumar
NodeJs Developer
Appinventiv


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
If that's the case, I think framework should not commit if there are any
outstanding records in teh reporter. That would prevent the scenario where
we could potentially lose records frm being sent either to Sink/the
reporter. WDYT about the KIP including that as part of the design?

On Sun, May 17, 2020 at 11:13 AM Randall Hauch  wrote:

> On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > Randall
> >
> > Thanks a lot for your thoughts. I was wondering if we would ever have to
> > make the API asynchronous, we could expose it as a new method right? If
> > that's a possibility would it be better if the API explicitly has
> semantics
> > of a synchronous API if the implementation is indeed going to be
> > synchronous.
> >
>
> Thanks, Magesh.
>
> I think it's likely that the implementation may need to be synchronous to
> some degree. For example, just to keep the implementation simple we might
> block the WorkerSinkTask after `put(Collection)` returns we
> might latch until the reporter has received all acks, especially if it
> simplifies the offset management and commit logic.
>
> Even if that's the case, having each `report(...)` call be asynchronous
> means that the sink task doesn't *have* to wait until each failed record
> has been recorded to continue sending valid records to the external system.
> Consider an example with 1000 records in a batch, where only the first
> record has an error. If `record(...)` were synchronous, the `put(...)`
> method would block reporting the first record and would then only send the
> 999 after that's happened. With an asynchronous `record(...)` method, the
> `put(...)` method could report the first record, send the 999 records, and
> then wait for the futures returned by the report method.
>
>
> >
> > On Sun, May 17, 2020, 9:27 AM Randall Hauch  wrote:
> >
> > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > mage...@confluent.io> wrote:
> > >
> > > > Thanks Randall. The suggestion i made also has a problem when
> reporter
> > > > isn't enabled where it could potentially write records after error
> > > records
> > > > to sink before failing.
> > > >
> > > > The other concern i had with reporter being asynchronous. For some
> > reason
> > > > if the reporter is taking longer because of say a specific broker
> > issue,
> > > > the connector might still move forward and commit if it's not waiting
> > for
> > > > the reporter.  During  this if the worker crashes we will now lose
> the
> > > bad
> > > > record
> > > >  I don't think this is desirable behavior. I think the synchronous
> > > reporter
> > > > provides better guarantees for all connectors.
> > > >
> > > >
> > > Thanks, Magesh.
> > >
> > > That's a valid concern, and maybe that will affect how the feature is
> > > actually implemented. I expect it to be a bit tricky to ensure that
> > errant
> > > records are fully written to Kafka before the offsets are committed, so
> > it
> > > might be simplest to start out with a synchronous implementation. But
> the
> > > API can still be an asynchronous design whether or not the
> implementation
> > > is synchronous. That gives us the ability in the future to change the
> > > implementation if we determine a way to handle all concerns. For
> example,
> > > the WorkerSinkTask may need to backoff if waiting to commit due to too
> > many
> > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > `report`
> > > method(s) synchronous from the beginning, it will be very challenging
> to
> > > change them in the future to be asynchronous.
> > >
> > > I guess it boils down to this question: do we know today that we will
> > > *never* want the reporter to write asynchronously?
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> >
>


-- 
Thanks
Magesh

*Magesh Nandakumar*
Software Engineer
mage...@confluent.io


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
Randall

Thanks a lot for your thoughts. I was wondering if we would ever have to
make the API asynchronous, we could expose it as a new method right? If
that's a possibility would it be better if the API explicitly has semantics
of a synchronous API if the implementation is indeed going to be
synchronous.

On Sun, May 17, 2020, 9:27 AM Randall Hauch  wrote:

> On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > Thanks Randall. The suggestion i made also has a problem when reporter
> > isn't enabled where it could potentially write records after error
> records
> > to sink before failing.
> >
> > The other concern i had with reporter being asynchronous. For some reason
> > if the reporter is taking longer because of say a specific broker issue,
> > the connector might still move forward and commit if it's not waiting for
> > the reporter.  During  this if the worker crashes we will now lose the
> bad
> > record
> >  I don't think this is desirable behavior. I think the synchronous
> reporter
> > provides better guarantees for all connectors.
> >
> >
> Thanks, Magesh.
>
> That's a valid concern, and maybe that will affect how the feature is
> actually implemented. I expect it to be a bit tricky to ensure that errant
> records are fully written to Kafka before the offsets are committed, so it
> might be simplest to start out with a synchronous implementation. But the
> API can still be an asynchronous design whether or not the implementation
> is synchronous. That gives us the ability in the future to change the
> implementation if we determine a way to handle all concerns. For example,
> the WorkerSinkTask may need to backoff if waiting to commit due to too many
> incomplete/unacknowledged reporter requests. OTOH, if we make the `report`
> method(s) synchronous from the beginning, it will be very challenging to
> change them in the future to be asynchronous.
>
> I guess it boils down to this question: do we know today that we will
> *never* want the reporter to write asynchronously?
>
> Best regards,
>
> Randall
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
Thanks Randall. The suggestion i made also has a problem when reporter
isn't enabled where it could potentially write records after error records
to sink before failing.

The other concern i had with reporter being asynchronous. For some reason
if the reporter is taking longer because of say a specific broker issue,
the connector might still move forward and commit if it's not waiting for
the reporter.  During  this if the worker crashes we will now lose the bad
record
 I don't think this is desirable behavior. I think the synchronous reporter
provides better guarantees for all connectors.

On Sun, May 17, 2020, 8:02 AM Randall Hauch  wrote:

> Magesh, we have talked above overloading various existing SinkTask methods,
> and we concluded that this style of evolution complicates migration,
> whereas providing the reporter via the context follows existing patterns in
> the API and simplifies backward compatibility concerns. Arjun's research
> shows that we can even introduce a new interface type and sink connector
> developers can very easily accommodate their sink connector implementations
> running in newer and older versions of the Connect runtime.
>
> Best regards,
>
> Randall
>
> On Sun, May 17, 2020 at 3:26 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > Have we considered returning error records by overriding flush/precommit?
> > If we think aesthetics is important this on my opinion is one possible
> > abstractions that could be cleaner. This would also mean that connector
> > developers wouldn't have to worry about a new reporter or think if its
> > synchronous or not synchronous. If error records are available and thedlq
> > isn't enabled framework can possibly fail the task. Alternatively, we
> could
> > also have an overloaded put return error records or even introduce a new
> > errorRecords that gets invoked after put.
> >
> > On Sat, May 16, 2020, 2:37 PM Aakash Shah  wrote:
> >
> > > Hi Randall,
> > >
> > > Thanks for the suggestion. I've updated the KIP with the agreed upon
> > > changes as well as the new suggestions Randall mentioned:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > >
> > >
> > > Please let me know what you think.
> > >
> > > Thanks,
> > > Aakash
> > >
> > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch 
> wrote:
> > >
> > > > Thanks again for the active discussion!
> > > >
> > > > Regarding the future-vs-callback discussion: I did like where Chris
> was
> > > > going with the Callback, but he raises good point that it's unclear
> > what
> > > to
> > > > use for the reporter type, since we'd need three parameters.
> > Introducing
> > > a
> > > > new interface makes it much harder for a sink task to be backward
> > > > compatible, so sticking with BiFunction is a good compromise. Plus,
> > > another
> > > > significant disadvantage of a callback approach is that a sink task's
> > > > callback is called from the producer thread, and this risks a
> > > > poorly written sink task callback killing the reporter's producer
> > without
> > > > necessarily failing the task. Using a future avoids this risk
> > altogether,
> > > > still provides the sink task with the ability to do synchronous
> > reporting
> > > > using Future, which is a standard and conventional design pattern. So
> > we
> > > do
> > > > seem to have converged on using `BiFunction > > > Future>` for the reporter type.
> > > >
> > > > Now, we still seem to not have converted upon how to pass the
> reporter
> > to
> > > > the sink task. I agree with Konstantine that the deprecation affects
> > only
> > > > newer versions of Connect, and that a sink task should deal with both
> > put
> > > > methods only when it wants to support older runtimes. I also think
> that
> > > > this is a viable approach, but I do concede that this evolution of
> the
> > > sink
> > > > task API is more complicated than it should be.
> > > >
> > > > In the interest of quickly coming to consensus on how we pass the
> > > reporter
> > > > to the sink task, I'd like to go back to Andrew's original
> suggestion,
> > > > which I think we disregarded too quickly: add a getter on the
> > > > SinkTaskContext interface. We already have precedent for adding
&

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Magesh kumar Nandakumar
Have we considered returning error records by overriding flush/precommit?
If we think aesthetics is important this on my opinion is one possible
abstractions that could be cleaner. This would also mean that connector
developers wouldn't have to worry about a new reporter or think if its
synchronous or not synchronous. If error records are available and thedlq
isn't enabled framework can possibly fail the task. Alternatively, we could
also have an overloaded put return error records or even introduce a new
errorRecords that gets invoked after put.

On Sat, May 16, 2020, 2:37 PM Aakash Shah  wrote:

> Hi Randall,
>
> Thanks for the suggestion. I've updated the KIP with the agreed upon
> changes as well as the new suggestions Randall mentioned:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
>
>
> Please let me know what you think.
>
> Thanks,
> Aakash
>
> On Sat, May 16, 2020 at 12:34 PM Randall Hauch  wrote:
>
> > Thanks again for the active discussion!
> >
> > Regarding the future-vs-callback discussion: I did like where Chris was
> > going with the Callback, but he raises good point that it's unclear what
> to
> > use for the reporter type, since we'd need three parameters. Introducing
> a
> > new interface makes it much harder for a sink task to be backward
> > compatible, so sticking with BiFunction is a good compromise. Plus,
> another
> > significant disadvantage of a callback approach is that a sink task's
> > callback is called from the producer thread, and this risks a
> > poorly written sink task callback killing the reporter's producer without
> > necessarily failing the task. Using a future avoids this risk altogether,
> > still provides the sink task with the ability to do synchronous reporting
> > using Future, which is a standard and conventional design pattern. So we
> do
> > seem to have converged on using `BiFunction > Future>` for the reporter type.
> >
> > Now, we still seem to not have converted upon how to pass the reporter to
> > the sink task. I agree with Konstantine that the deprecation affects only
> > newer versions of Connect, and that a sink task should deal with both put
> > methods only when it wants to support older runtimes. I also think that
> > this is a viable approach, but I do concede that this evolution of the
> sink
> > task API is more complicated than it should be.
> >
> > In the interest of quickly coming to consensus on how we pass the
> reporter
> > to the sink task, I'd like to go back to Andrew's original suggestion,
> > which I think we disregarded too quickly: add a getter on the
> > SinkTaskContext interface. We already have precedent for adding methods
> to
> > one of the context classes with the newly-adopted KIP-131, which adds a
> > getter for the OffsetStorageReader on the (new) SourceConnectorContext.
> > That KIP accepts the fact that a source connector wanting to use this
> > feature while also keeping the ability to be installed into older Connect
> > runtimes must guard its use of the context's getter method.
> >
> > I think we can use the same pattern for this KIP, and add a getter to the
> > existing SinkTaskContext that is defined something like:
> >
> > public interface SinkTaskContext {
> > ...
> > /**
> >  * Get the reporter to which the sink task can report problematic or
> > failed {@link SinkRecord}
> >  * passed to the {@link SinkTask#put(Collection)} method. When
> > reporting a failed record,
> >  * the sink task will receive a {@link Future} that the task can
> > optionally use to wait until
> >  * the failed record and exception have been written to Kafka via
> > Connect's DLQ. Note that
> >  * the result of this method may be null if this connector has not
> been
> > configured with a DLQ.
> >  *
> >  * This method was added in Apache Kafka 2.9. Sink tasks that use
> > this method but want to
> >  * maintain backward compatibility so they can also be deployed to
> > older Connect runtimes
> >  * should guard the call to this method with a try-catch block, since
> > calling this method will result in a
> >  * {@link NoSuchMethodException} when the sink connector is deployed
> to
> > Connect runtimes
> >  * older than Kafka 2.9. For example:
> >  * 
> >  * BiFunctionSinkTask, Throwable, FutureVoid
> > reporter;
> >  * try {
> >  * reporter = context.failedRecordReporter();
> >  * } catch (NoSuchMethodException e) {
> >  * reporter = null;
> >  * }
> >  * 
> >  *
> >  * @return the reporter function; null if no error reporter has been
> > configured for the connector
> >  * @since 2.9
> >  */
> > BiFunction> failedRecordReporter();
> > }
> >
> > The main advantage is that the KIP no longer has to make *any other*
> > changes to the Sink Connector or Task API. The above is really the only
> > change, and it's merely an addition to the API. No 

[jira] [Created] (KAFKA-9884) Unable to override some client properties in Mirror maker 2.0

2020-04-17 Thread Mithun Kumar (Jira)
Mithun Kumar created KAFKA-9884:
---

 Summary: Unable to override some client properties in Mirror maker 
2.0
 Key: KAFKA-9884
 URL: https://issues.apache.org/jira/browse/KAFKA-9884
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.4.1, 2.5.0, 2.4.0
Reporter: Mithun Kumar
 Attachments: mm2.log

I have a two 3 node kafka clusters. MirrorMaker 2.0 is being run as a cluster 
with bin/connect-mirror-maker.sh mm2.properties

I am trying to disable message duplication on replication by enabling 
idempotence. I understand that EOS is marked as a future work in 
[KIP-382|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
 however it should be possible by setting enable.idempotence = true and retries 
> 0.

The .enable.idempotence = true takes effect, however overriding 
the retries fails. I tried all 3 versions that provide MM2 2.4.0 , 2.4.1 and 
2.5.0.

My mm2.properties config :
{noformat}
name = pri_to_bkp
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
topics = test-mm-topic-3
groups = .*
clusters = pri, bkp
source.cluster.alias = pri
target.cluster.alias = bkp

sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka
security.protocol = SASL_PLAINTEXT
sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
keyTab="/etc/security/keytabs/user.keytab" \
principal="u...@xx.xx.com";

pri.enable.idempotence = true
bkp.enable.idempotence = true
pri.retries = 2147483647
bkp.retries = 2147483647

pri.bootstrap.servers = SASL_PLAINTEXT://kafka1:9092, 
SASL_PLAINTEXT://kafka2:9092, SASL_PLAINTEXT://kafka3:9092
bkp.bootstrap.servers = SASL_PLAINTEXT://bkp-kafka1:9092, 
SASL_PLAINTEXT://bkp-kafka2:9092, SASL_PLAINTEXT://bkp-kafka3:9092
pri->bkp.enabled = true
pri->bkp.topics = "test-mm-topic-3"
{noformat}
 

The error leading to failure is:
{noformat}
[2020-04-17 15:46:26,525] ERROR [Worker clientId=connect-1, groupId=pri-mm2] 
Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
org.apache.kafka.common.config.ConfigException: Must set retries to non-zero 
when using the idempotent producer.
at 
org.apache.kafka.clients.producer.ProducerConfig.maybeOverrideAcksAndRetries(ProducerConfig.java:432)
at 
org.apache.kafka.clients.producer.ProducerConfig.postProcessParsedConfig(ProducerConfig.java:400)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:110)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
at 
org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:481)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:270)
at 
org.apache.kafka.connect.util.KafkaBasedLog.createProducer(KafkaBasedLog.java:248)
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:129)
at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:199)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:124)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:284)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-04-17 15:46:29,089] INFO [Worker clientId=connect-1, groupId=pri-mm2] 
Herder stopped 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:636)
[2020-04-17 15:46:29,089] INFO [Worker clientId=connect-2, groupId=bkp-mm2] 
Herder stopping 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:616)

[2020-04-17 15:46:34,090] INFO [Worker clientId=connect-2, groupId=bkp-mm2] 
Herder stopped 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:636)
[2020-04-17 15:46:34,090] INFO Kafka MirrorMaker stopped. 
(org.apache.kafka.connect.mirror.MirrorMaker:191)
{noformat}
 The complete log file is attached.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9375) Add thread names to kafka connect threads

2020-01-07 Thread karan kumar (Jira)
karan kumar created KAFKA-9375:
--

 Summary: Add thread names to kafka connect threads
 Key: KAFKA-9375
 URL: https://issues.apache.org/jira/browse/KAFKA-9375
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.3.1, 2.4.0
Reporter: karan kumar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-553: Enable TLSv1.3 by default and disable all protocols except [TLSV1.2, TLSV1.3]

2020-01-06 Thread Brajesh Kumar
Hello Rajini,

What is the plan to run system tests using JDK 11? Is someone working on
this?

On Mon, Jan 6, 2020 at 3:00 PM Rajini Sivaram 
wrote:

> Hi Nikolay,
>
> We can leave the KIP open and restart the discussion once system tests are
> running.
>
> Thanks,
>
> Rajini
>
> On Mon, Jan 6, 2020 at 2:46 PM Николай Ижиков  wrote:
>
> > Hello, Rajini.
> >
> > Thanks, for the feedback.
> >
> > Should I mark this KIP as declined?
> > Or just wait for the system tests results?
> >
> > > 6 янв. 2020 г., в 17:26, Rajini Sivaram 
> > написал(а):
> > >
> > > Hi Nikolay,
> > >
> > > Thanks for the KIP. We currently run system tests using JDK 8 and hence
> > we
> > > don't yet have full system test results with TLS 1.3 which requires JDK
> > 11.
> > > We should wait until that is done before enabling TLS1.3 by default.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Mon, Dec 30, 2019 at 5:36 AM Николай Ижиков 
> > wrote:
> > >
> > >> Hello, Team.
> > >>
> > >> Any feedback on this KIP?
> > >> Do we need this in Kafka?
> > >>
> > >>> 24 дек. 2019 г., в 18:28, Nikolay Izhikov 
> > >> написал(а):
> > >>>
> > >>> Hello,
> > >>>
> > >>> I'd like to start a discussion of KIP.
> > >>> Its goal is to enable TLSv1.3 and disable obsolete versions by
> default.
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=142641956
> > >>>
> > >>> Your comments and suggestions are welcome.
> > >>>
> > >>
> > >>
> >
> >
>


-- 
Regards,
Brajesh Kumar


[jira] [Created] (KAFKA-9337) Simplifying standalone mm2-connect config

2019-12-27 Thread karan kumar (Jira)
karan kumar created KAFKA-9337:
--

 Summary: Simplifying standalone mm2-connect config
 Key: KAFKA-9337
 URL: https://issues.apache.org/jira/browse/KAFKA-9337
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, mirrormaker
Affects Versions: 2.4.0
Reporter: karan kumar


One of the nice things about kafka is setting up in the local environment is 
really simple. I was giving a try to the latest feature ie MM2 and found it 
took me some time to get a minimal setup running. 
Default config provided assumes that there will already be 3 brokers running 
due to the default replication factor of the admin topics the mm2 connector 
creates. 

This got me thinking that most of the people would follow the same approach I 
followed. 
1. Start a single broker cluster on 9092 
2. Start another single cluster broker on, let's say, 10002 
3. Start mm2 by"./bin/connect-mirror-maker.sh 
./config/connect-mirror-maker.properties" 

What happened was I had to supply a lot more configs 

This jira is created post discussion on the mailing list:
https://lists.apache.org/thread.html/%3ccajxudh13kw3nam3ho69wrozsyovwue1nxf9hkcbawc9r-3d...@mail.gmail.com%3E

cc [~ryannedolan]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Kafka 2.2.2

2019-12-02 Thread Vinay Kumar
Hi,
May I know how would this version be different from the later versions
2.3.0, 2.3.1.
I'm looking to upgrade my current Kafka 2.1.0, so can you please let me
know what differences it would make for upgrading to 2.2.2 or 2.3.1?

Thanks,
Vinay

On Monday, December 2, 2019, Vahid Hashemian 
wrote:

> Awesome. Thanks for managing this release Randall!
>
> Regards,
> --Vahid
>
> On Sun, Dec 1, 2019 at 5:45 PM Randall Hauch  wrote:
>
> > The Apache Kafka community is pleased to announce the release for Apache
> > Kafka 2.2.2
> >
> > This is a bugfix release for Apache Kafka 2.2.
> > All of the changes in this release can be found in the release notes:
> > https://www.apache.org/dist/kafka/2.2.2/RELEASE_NOTES.html
> >
> > You can download the source and binary release from:
> > https://kafka.apache.org/downloads#2.2.2
> >
> >
> > 
> ---
> >
> >
> > Apache Kafka is a distributed streaming platform with four core APIs:
> >
> >
> > ** The Producer API allows an application to publish a stream records to
> > one or more Kafka topics.
> >
> > ** The Consumer API allows an application to subscribe to one or more
> > topics and process the stream of records produced to them.
> >
> > ** The Streams API allows an application to act as a stream processor,
> > consuming an input stream from one or more topics and producing an
> > output stream to one or more output topics, effectively transforming the
> > input streams to output streams.
> >
> > ** The Connector API allows building and running reusable producers or
> > consumers that connect Kafka topics to existing applications or data
> > systems. For example, a connector to a relational database might
> > capture every change to a table.
> >
> >
> > With these APIs, Kafka can be used for two broad classes of application:
> >
> > ** Building real-time streaming data pipelines that reliably get data
> > between systems or applications.
> >
> > ** Building real-time streaming applications that transform or react
> > to the streams of data.
> >
> >
> > Apache Kafka is in use at large and small companies worldwide, including
> > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> >
> > A big thank you for the following 41 contributors to this release!
> >
> > A. Sophie Blee-Goldman, Matthias J. Sax, Bill Bejeck, Jason Gustafson,
> > Chris Egerton, Boyang Chen, Alex Diachenko, cpettitt-confluent, Magesh
> > Nandakumar, Randall Hauch, Ismael Juma, John Roesler, Konstantine
> > Karantasis, Mickael Maison, Nacho Muñoz Gómez, Nigel Liang, Paul, Rajini
> > Sivaram, Robert Yokota, Stanislav Kozlovski, Vahid Hashemian, Victoria
> > Bialas, cadonna, cwildman, mjarvie, sdreynolds, slim, vinoth chandar,
> > wenhoujx, Arjun Satish, Chia-Ping Tsai, Colin P. Mccabe, David Arthur,
> > Dhruvil Shah, Greg Harris, Gunnar Morling, Hai-Dang Dam, Lifei Chen,
> Lucas
> > Bradstreet, Manikumar Reddy, Michał Borowiecki
> >
> > We welcome your help and feedback. For more information on how to
> > report problems, and to get involved, visit the project website at
> > https://kafka.apache.org/
> >
> > Thank you!
> >
> >
> > Regards,
> > Randall Hauch
> >
> > --
> > You received this message because you are subscribed to the Google Groups
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To view this discussion on the web visit
> > https://groups.google.com/d/msgid/kafka-clients/
> CALYgK0EsNFakX7F0FDkXvMNmUe8g8w-GNRM7EJjD9CJLK7sn0A%40mail.gmail.com
> >  CALYgK0EsNFakX7F0FDkXvMNmUe8g8w-GNRM7EJjD9CJLK7sn0A%40mail.
> gmail.com?utm_medium=email_source=footer>
> > .
> >
>
>
> --
>
> Thanks!
> --Vahid
>


Re: [ANNOUNCE] New committer: Mickael Maison

2019-11-08 Thread Ankit Kumar
Congratulations Mickael!!

*Best regards,*
*Ankit Kumar.*


On Fri, Nov 8, 2019 at 9:08 PM Viktor Somogyi-Vass 
wrote:

> Congrats Mickael!! :)
>
> On Fri, Nov 8, 2019 at 1:24 PM Satish Duggana 
> wrote:
>
> > Congratulations Mickael!!
> >
> > On Fri, Nov 8, 2019 at 2:50 PM Rajini Sivaram 
> > wrote:
> > >
> > > Congratulations, Mickael, well deserved!!
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > > On Fri, Nov 8, 2019 at 9:08 AM David Jacot 
> wrote:
> > >
> > > > Congrats Mickeal, well deserved!
> > > >
> > > > On Fri, Nov 8, 2019 at 8:56 AM Tom Bentley 
> > wrote:
> > > >
> > > > > Congratulations Mickael!
> > > > >
> > > > > On Fri, Nov 8, 2019 at 6:41 AM Vahid Hashemian <
> > > > vahid.hashem...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Congrats Mickael,
> > > > > >
> > > > > > Well deserved!
> > > > > >
> > > > > > --Vahid
> > > > > >
> > > > > > On Thu, Nov 7, 2019 at 9:10 PM Maulin Vasavada <
> > > > > maulin.vasav...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Congratulations Mickael!
> > > > > > >
> > > > > > > On Thu, Nov 7, 2019 at 8:27 PM Manikumar <
> > manikumar.re...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Congrats Mickeal!
> > > > > > > >
> > > > > > > > On Fri, Nov 8, 2019 at 9:05 AM Dong Lin  >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Congratulations Mickael!
> > > > > > > > >
> > > > > > > > > On Thu, Nov 7, 2019 at 1:38 PM Jun Rao 
> > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Everyone,
> > > > > > > > > >
> > > > > > > > > > The PMC of Apache Kafka is pleased to announce a new
> Kafka
> > > > > > committer
> > > > > > > > > > Mickael
> > > > > > > > > > Maison.
> > > > > > > > > >
> > > > > > > > > > Mickael has been contributing to Kafka since 2016. He
> > proposed
> > > > > and
> > > > > > > > > > implemented multiple KIPs. He has also been propomating
> > Kafka
> > > > > > through
> > > > > > > > > blogs
> > > > > > > > > > and public talks.
> > > > > > > > > >
> > > > > > > > > > Congratulations, Mickael!
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun (on behalf of the Apache Kafka PMC)
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Thanks!
> > > > > > --Vahid
> > > > > >
> > > > >
> > > >
> >
>


[jira] [Created] (KAFKA-9163) Re-assignment of Partition gets infinitely stuck "Still in progress" state

2019-11-07 Thread vikash kumar (Jira)
vikash kumar created KAFKA-9163:
---

 Summary: Re-assignment of Partition gets infinitely stuck "Still 
in progress" state
 Key: KAFKA-9163
 URL: https://issues.apache.org/jira/browse/KAFKA-9163
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.1
 Environment: Amazon Linux 1 x86_64 GNU/Linux
Reporter: vikash kumar


We have 6 node kafka cluster. Out of the 6 machines, 3 of them have both Kafka 
+ zookeeper, and the remaining 3 of them have just kafka.
Recently, we added one more kafka node. While re-assigning the partition to all 
the nodes (including the newer one), we executed the below command:

/opt/kafka/bin/kafka-reassign-partitions.sh --reassignment-json-file 
new_assignment_details.json --execute --zookeeper localhost:2181

However, when we verified the status by using the below command,

/opt/kafka/bin/kafka-reassign-partitions.sh --reassignment-json-file 
new_assignment_details.json --verify --zookeeper localhost:2181

We get the below output. Some of the partitions are re-assignment is still in 
progress.

/opt/kafka/bin/kafka-reassign-partitions.sh --reassignment-json-file 
new_assignment_details.json --verify --zookeeper localhost:2181 | grep 
'progress'
Reassignment of partition [topic-name,854] is still in progress
Reassignment of partition [topic-name,674] is still in progress
Reassignment of partition [topic-name,944] is still in progress
Reassignment of partition [topic-name,404] is still in progress
Reassignment of partition [topic-name,314] is still in progress
Reassignment of partition [topic-name,853] is still in progress
Reassignment of partition [prom-metrics,403] is still in progress
Reassignment of partition [prom-metrics,134] is still in progress


There is no way to either:

1. Cancel the on-going partition re-assignment.
2. Rollback is also not possible. ( When we try doing that it says that "There 
is an existing assignment running."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9162) Re-assignment of Partition gets infinitely stuck "Still in progress" state

2019-11-07 Thread vikash kumar (Jira)
vikash kumar created KAFKA-9162:
---

 Summary: Re-assignment of Partition gets infinitely stuck "Still 
in progress" state
 Key: KAFKA-9162
 URL: https://issues.apache.org/jira/browse/KAFKA-9162
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.1
 Environment: Amazon Linux 1 x86_64 GNU/Linux
Reporter: vikash kumar


We have 6 node kafka cluster. Out of the 6 machines, 3 of them have both Kafka 
+ zookeeper, and the remaining 3 of them have just kafka.
Recently, we added one more kafka node. While re-assigning the partition to all 
the nodes (including the newer one), we executed the below command:

/opt/kafka/bin/kafka-reassign-partitions.sh --reassignment-json-file 
new_assignment_details.json --execute --zookeeper localhost:2181

However, when we verified the status by using the below command,

/opt/kafka/bin/kafka-reassign-partitions.sh --reassignment-json-file 
new_assignment_details.json --verify --zookeeper localhost:2181

We get the below output. Some of the partitions are re-assignment is still in 
progress.

/opt/kafka/bin/kafka-reassign-partitions.sh --reassignment-json-file 
new_assignment_details.json --verify --zookeeper localhost:2181 | grep 
'progress'
Reassignment of partition [topic-name,854] is still in progress
Reassignment of partition [topic-name,674] is still in progress
Reassignment of partition [topic-name,944] is still in progress
Reassignment of partition [topic-name,404] is still in progress
Reassignment of partition [topic-name,314] is still in progress
Reassignment of partition [topic-name,853] is still in progress
Reassignment of partition [prom-metrics,403] is still in progress
Reassignment of partition [prom-metrics,134] is still in progress


There is no way to either:

1. Cancel the on-going partition re-assignment.
2. Rollback is also not possible. ( When we try doing that it says that "There 
is an existing assignment running."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-11-05 Thread aishwarya kumar
Thank you so much for the votes.

I will consider that the KIP is accepted, with 3 binding votes from
Matthias, Bill and Guozhang.

And 1 non-binding vote from John.


Best,
Aishwarya


On Mon, Nov 4, 2019, 12:18 PM Guozhang Wang  wrote:

> +1 (binding), thanks Aishwarya!
>
> On Sun, Nov 3, 2019 at 11:46 AM aishwarya kumar 
> wrote:
>
> > This thread has been open for more than 72 hours. So far there are 2
> > binding and 1 non-binding votes, looking to conclude this quickly!!
> >
> > Best,
> > Aishwarya
> >
> > On Mon, Oct 28, 2019 at 5:00 PM John Roesler  wrote:
> >
> > > Thanks, Aishwarya!
> > >
> > > I'm +1 (non-binding)
> > >
> > > -John
> > >
> > > On Mon, Oct 28, 2019 at 11:58 AM aishwarya kumar 
> > > wrote:
> > > >
> > > > Thank you,
> > > >
> > > > Two binding votes so far.
> > > >
> > > > I'll keep this thread open for a couple of days.
> > > >
> > > > Best,
> > > > Aishwarya
> > > >
> > > > On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:
> > > >
> > > > > Thanks for the KIP, this is something that will be appreciated by
> the
> > > > > community.
> > > > >
> > > > > +1(binding)
> > > > >
> > > > > -Bill
> > > > >
> > > > > On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > > > > > Hello All,
> > > > > > >
> > > > > > > After concluding discussions for this KIP, I would like to go
> > > forward
> > > > > > with
> > > > > > > the voting process.
> > > > > > >
> > > > > > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > > > > > KIP :
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > > > > > >
> > > > > > > Thank you,
> > > > > > > Aishwarya
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-11-03 Thread aishwarya kumar
This thread has been open for more than 72 hours. So far there are 2
binding and 1 non-binding votes, looking to conclude this quickly!!

Best,
Aishwarya

On Mon, Oct 28, 2019 at 5:00 PM John Roesler  wrote:

> Thanks, Aishwarya!
>
> I'm +1 (non-binding)
>
> -John
>
> On Mon, Oct 28, 2019 at 11:58 AM aishwarya kumar 
> wrote:
> >
> > Thank you,
> >
> > Two binding votes so far.
> >
> > I'll keep this thread open for a couple of days.
> >
> > Best,
> > Aishwarya
> >
> > On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:
> >
> > > Thanks for the KIP, this is something that will be appreciated by the
> > > community.
> > >
> > > +1(binding)
> > >
> > > -Bill
> > >
> > > On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Thanks for the KIP!
> > > >
> > > > +1 (binding)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > > > Hello All,
> > > > >
> > > > > After concluding discussions for this KIP, I would like to go
> forward
> > > > with
> > > > > the voting process.
> > > > >
> > > > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > > > KIP :
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > > > >
> > > > > Thank you,
> > > > > Aishwarya
> > > > >
> > > >
> > > >
> > >
>


Deferred processing

2019-10-31 Thread Naveen kumar mandavilli
Hi

We have a use case where we have to defer processing the messages by one
hour from the time it got published. Please suggest if there is an elegant
solution to handle this.

Thanks
Naveen kumar M.
-- 
Naveen kumar M.


Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-10-28 Thread aishwarya kumar
Thank you,

Two binding votes so far.

I'll keep this thread open for a couple of days.

Best,
Aishwarya

On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:

> Thanks for the KIP, this is something that will be appreciated by the
> community.
>
> +1(binding)
>
> -Bill
>
> On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the KIP!
> >
> > +1 (binding)
> >
> >
> > -Matthias
> >
> > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > Hello All,
> > >
> > > After concluding discussions for this KIP, I would like to go forward
> > with
> > > the voting process.
> > >
> > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > KIP :
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > >
> > > Thank you,
> > > Aishwarya
> > >
> >
> >
>


[VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-10-24 Thread aishwarya kumar
Hello All,

After concluding discussions for this KIP, I would like to go forward with
the voting process.

Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
KIP :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Thank you,
Aishwarya


Re: [Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-10-23 Thread aishwarya kumar
Apologies for the delay.

I have made the changes in the KIP, i'll be starting the voting process
shortly.

Best regards,
Aishwarya Kumar

On Mon, Oct 7, 2019 at 6:06 PM Matthias J. Sax 
wrote:

> Aishwarya,
>
> Why is bullet point (2) formatted as "strike through"? If you intend to
> replace it with bullet point (3), just remove it completely. The KIP
> should reflect the actual proposal. Maybe move it to "Rejected
> Alternative" section?
>
> For (3c), it should also say, if `Materialize` specifies a queryable
> store name. If there is no store name provided, either (a) or (b) applies.
>
>
>
> Overall LGTM. Feel free to start a vote.
>
>
> -Matthias
>
>
>
> On 10/1/19 7:48 AM, aishwarya kumar wrote:
> > Thank you all for the feedback, I will keep this thread open for
> discussion
> > for a couple of more days and then start with the voting process.
> >
> > Best regards,
> > Aishwarya
> >
> > On Fri, Sep 27, 2019, 12:37 PM John Roesler  wrote:
> >
> >> Looks good to me! I have no further comments.
> >>
> >> Thanks again for the KIP, Aishwarya!
> >> -John
> >>
> >> On Fri, Sep 27, 2019 at 10:11 AM aishwarya kumar 
> >> wrote:
> >>>
> >>> Hello John,
> >>>
> >>> Thank you for pointing this out to me, to maintain consistency across
> >> API's
> >>> it does make sense to allow users to define custom names for
> >>> their processors.
> >>>
> >>> I've made the change in the KIP:
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> >>>
> >>> Best,
> >>> Aishwarya
> >>>
> >>> On Tue, Sep 24, 2019 at 11:54 AM John Roesler 
> wrote:
> >>>
> >>>> Hey Aishwarya,
> >>>>
> >>>> Thanks for the KIP! It looks good to me, although in a post-KIP-307
> >>>> world, we also need a "Named" parameter (to give the processor node a
> >>>> name, as opposed to the store itself).
> >>>>
> >>>> This would result in a total of four overloads:
> >>>> 1. no args
> >>>> 2. Named
> >>>> 3. Materialized
> >>>> 4. Materialized, Named
> >>>>
> >>>> I'd like to propose a re-design of the DSL in the future to clean this
> >>>> up, but for now, this is the pattern we have to follow.
> >>>>
> >>>> Thoughts?
> >>>>
> >>>> Thanks,
> >>>> -John
> >>>>
> >>>> On Tue, Sep 24, 2019 at 9:54 AM aishwarya kumar 
> >>>> wrote:
> >>>>>
> >>>>> Thank you for the suggestion Matthais, i've made the necessary
> >> changes in
> >>>>> the KIP.
> >>>>>
> >>>>> Keeping this thread open for further input.
> >>>>> KIP link:
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> >>>>>
> >>>>> Best,
> >>>>> Aishwarya
> >>>>>
> >>>>> On Thu, Sep 19, 2019 at 10:50 AM aishwarya kumar  >>>
> >>>> wrote:
> >>>>>
> >>>>>> Thanks Matthias,
> >>>>>>
> >>>>>> That does make sense, let me update the KIP to reflect the
> >>>> Materialization
> >>>>>> scenario.
> >>>>>>
> >>>>>> Best,
> >>>>>> Aishwarya
> >>>>>>
> >>>>>> On Tue, Sep 17, 2019, 2:49 PM Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Aishwarya,
> >>>>>>>
> >>>>>>> thanks for the KIP. Overall, I think it makes sense to allow
> >>>> converting
> >>>>>>> a KStream into a KTable.
> >>>>>>>
> >>>>>>> From the KIP:
> >>>>>>>
> >>>>>>>> materializing these KTables should only be allowed if the
> >> overloaded
> >>>>>>> function with Materialized is used (and if optimization is turned
> >> on
> >>>> it may
> >>>&g

RE: [DISCUSS] KIP-530: Considerrenaming'UsePreviousTimeOnInvalidTimeStamp' classto'UsePartitionTimeOnInvalidTimeStamp'

2019-10-07 Thread Rabi Kumar K C
Hi Bruno, 

Ha I see what you were talking about the extract method in 
UsePreviousTimeInvalidTimeStamp. Please ignore my last mail and I will update 
the KIP accordingly.

With Best Regards,
Rabi Kumar K C

Sent from Mail for Windows 10

From: Rabi Kumar K C
Sent: Monday, October 7, 2019 4:50 PM
To: dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP-530: 
Considerrenaming'UsePreviousTimeOnInvalidTimeStamp' 
classto'UsePartitionTimeOnInvalidTimeStamp'

Hi Bruno,

Thank You for your suggestions. I have made necessary changes in KIP and 
hopefully it’s fine now and if not then please do let me know.

To answer your question 4)
right now in trunk we can see that extract method is not present in 
UsePreviousTimeOnInvalidTimestamp instead it implements onInvalidTimestamp 
which is abstract method of super class  ExtractRecordMetadataTimestamp. I have 
only seen extract() method in ExtractRecordMetadataTimestamp. Please do correct 
me if I am wrong. 

And yes I do agree with you on 5) the deprecation part for compatibility, 
deprecation and migration plan 


With Best Regards,
Rabi Kumar K C
Sent from Mail for Windows 10

From: Bruno Cadonna
Sent: Monday, October 7, 2019 3:47 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-530: Consider 
renaming'UsePreviousTimeOnInvalidTimeStamp' class 
to'UsePartitionTimeOnInvalidTimeStamp'

Hi Rabi,

Thank you for the KIP!

1.) Could you please improve the formatting of the KIP? For instance,
use appropriate formatting for code to differentiate it from the text.
Also, we usually do not use italics to write KIPs. Look at other KIPs
to get an idea of the formatting.

2.) "Public Interfaces" does not directly refer to interfaces in Java.
It rather refers to the APIs that are visible from the outside. Thus,
you should specify the class `UsePartitionOnInvalidTimeStamp` with its
method signatures but without implementation.

3.) Under "Public Interfaces", you should also mention whether `
UsePreviousTimeOnInvalidTimestamp` should be deprecated or not.

4.) What do you mean with "now extract has been removed from
'UsePreviousTimeOnInvalidTimestamp'"? Without `extract()`,
`UsePreviousTimeOnInvalidTimestamp` would not implement the
`TimestampExtractor` interface anymore.

5.) Regarding "Compatibility, Deprecation, and Migration Plan", I do
not think that we can simply remove
`UsePreviousTimeOnInvalidTimestamp` in the next minor release. It
needs to be deprecated beforehand.

Best,
Bruno

On Wed, Oct 2, 2019 at 4:49 PM RABI K.C.  wrote:
>
> Hello All,
>
> This is KIP for the change of Class name from
> UsePreviousTimeOnInvalidTimeStamp to UsePartitionTimeOnInvalidTimeStamp.
> Link and Jira ticket is mentioned below:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807
> https://issues.apache.org/jira/browse/KAFKA-8953
>
> Would be pleased to get your feedback on this.
>
> With Best Regards,
> Rabi Kumar K C




RE: [DISCUSS] KIP-530: Consider renaming'UsePreviousTimeOnInvalidTimeStamp' class to'UsePartitionTimeOnInvalidTimeStamp'

2019-10-07 Thread Rabi Kumar K C
Hi Bruno,

Thank You for your suggestions. I have made necessary changes in KIP and 
hopefully it’s fine now and if not then please do let me know.

To answer your question 4)
 right now in trunk we can see that extract method is not present in 
UsePreviousTimeOnInvalidTimestamp instead it implements onInvalidTimestamp 
which is abstract method of super class  ExtractRecordMetadataTimestamp. I have 
only seen extract() method in ExtractRecordMetadataTimestamp. Please do correct 
me if I am wrong. 

And yes I do agree with you on 5) the deprecation part for compatibility, 
deprecation and migration plan 


With Best Regards,
Rabi Kumar K C
Sent from Mail for Windows 10

From: Bruno Cadonna
Sent: Monday, October 7, 2019 3:47 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-530: Consider 
renaming'UsePreviousTimeOnInvalidTimeStamp' class 
to'UsePartitionTimeOnInvalidTimeStamp'

Hi Rabi,

Thank you for the KIP!

1.) Could you please improve the formatting of the KIP? For instance,
use appropriate formatting for code to differentiate it from the text.
Also, we usually do not use italics to write KIPs. Look at other KIPs
to get an idea of the formatting.

2.) "Public Interfaces" does not directly refer to interfaces in Java.
It rather refers to the APIs that are visible from the outside. Thus,
you should specify the class `UsePartitionOnInvalidTimeStamp` with its
method signatures but without implementation.

3.) Under "Public Interfaces", you should also mention whether `
UsePreviousTimeOnInvalidTimestamp` should be deprecated or not.

4.) What do you mean with "now extract has been removed from
'UsePreviousTimeOnInvalidTimestamp'"? Without `extract()`,
`UsePreviousTimeOnInvalidTimestamp` would not implement the
`TimestampExtractor` interface anymore.

5.) Regarding "Compatibility, Deprecation, and Migration Plan", I do
not think that we can simply remove
`UsePreviousTimeOnInvalidTimestamp` in the next minor release. It
needs to be deprecated beforehand.

Best,
Bruno

On Wed, Oct 2, 2019 at 4:49 PM RABI K.C.  wrote:
>
> Hello All,
>
> This is KIP for the change of Class name from
> UsePreviousTimeOnInvalidTimeStamp to UsePartitionTimeOnInvalidTimeStamp.
> Link and Jira ticket is mentioned below:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807
> https://issues.apache.org/jira/browse/KAFKA-8953
>
> Would be pleased to get your feedback on this.
>
> With Best Regards,
> Rabi Kumar K C



[jira] [Created] (KAFKA-8974) Sink Connectors can't handle topic list with whitespaces

2019-10-03 Thread Magesh kumar Nandakumar (Jira)
Magesh kumar Nandakumar created KAFKA-8974:
--

 Summary: Sink Connectors can't handle topic list with whitespaces
 Key: KAFKA-8974
 URL: https://issues.apache.org/jira/browse/KAFKA-8974
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar


Sink connector allows the user to configure`topics` config which is a 
comma-separated list of topics. When the user inadvertently includes white 
spaces in the coman separated list like `topic1, topic2` the connector doesn't 
process `topic2` correctly. The fix is to trim the individual items before 
creating the consumer subscribption



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-10-01 Thread aishwarya kumar
Thank you all for the feedback, I will keep this thread open for discussion
for a couple of more days and then start with the voting process.

Best regards,
Aishwarya

On Fri, Sep 27, 2019, 12:37 PM John Roesler  wrote:

> Looks good to me! I have no further comments.
>
> Thanks again for the KIP, Aishwarya!
> -John
>
> On Fri, Sep 27, 2019 at 10:11 AM aishwarya kumar 
> wrote:
> >
> > Hello John,
> >
> > Thank you for pointing this out to me, to maintain consistency across
> API's
> > it does make sense to allow users to define custom names for
> > their processors.
> >
> > I've made the change in the KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> >
> > Best,
> > Aishwarya
> >
> > On Tue, Sep 24, 2019 at 11:54 AM John Roesler  wrote:
> >
> > > Hey Aishwarya,
> > >
> > > Thanks for the KIP! It looks good to me, although in a post-KIP-307
> > > world, we also need a "Named" parameter (to give the processor node a
> > > name, as opposed to the store itself).
> > >
> > > This would result in a total of four overloads:
> > > 1. no args
> > > 2. Named
> > > 3. Materialized
> > > 4. Materialized, Named
> > >
> > > I'd like to propose a re-design of the DSL in the future to clean this
> > > up, but for now, this is the pattern we have to follow.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Tue, Sep 24, 2019 at 9:54 AM aishwarya kumar 
> > > wrote:
> > > >
> > > > Thank you for the suggestion Matthais, i've made the necessary
> changes in
> > > > the KIP.
> > > >
> > > > Keeping this thread open for further input.
> > > > KIP link:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > > >
> > > > Best,
> > > > Aishwarya
> > > >
> > > > On Thu, Sep 19, 2019 at 10:50 AM aishwarya kumar  >
> > > wrote:
> > > >
> > > > > Thanks Matthias,
> > > > >
> > > > > That does make sense, let me update the KIP to reflect the
> > > Materialization
> > > > > scenario.
> > > > >
> > > > > Best,
> > > > > Aishwarya
> > > > >
> > > > > On Tue, Sep 17, 2019, 2:49 PM Matthias J. Sax <
> matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> Aishwarya,
> > > > >>
> > > > >> thanks for the KIP. Overall, I think it makes sense to allow
> > > converting
> > > > >> a KStream into a KTable.
> > > > >>
> > > > >> From the KIP:
> > > > >>
> > > > >> > materializing these KTables should only be allowed if the
> overloaded
> > > > >> function with Materialized is used (and if optimization is turned
> on
> > > it may
> > > > >> still be only logically materialized if the queryable name is not
> > > set).
> > > > >>
> > > > >> Can you elaborate? I think the behavior we want should align with
> the
> > > > >> behavior of `StreamsBuilder#table()`.
> > > > >>
> > > > >> From my understanding (correct me if I am wrong) it should be:
> > > > >>
> > > > >> (1) If optimization is turned off, the KTable will always be
> > > > >> materialized, independent which method is used. The KTable will
> not be
> > > > >> queryable though.
> > > > >>
> > > > >> (2) If optimization is turned on and if `toTable()` is used, the
> > > KTable
> > > > >> may or may not be materialized. For this case, even if the KTable
> is
> > > > >> materialized, the store would not be queryable.
> > > > >>
> > > > >> (3) If `toTable(Materialized)` is use and a `storeName` or
> > > > >> `StoreSupplier` is specified, the store will always be
> materialized
> > > and
> > > > >> also be queryable. Otherwise, case (1) or (2) applies.
> > > > >>
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 9/17/19 6:42 AM, aishwarya kumar wrote:
> > > > >> > Hi All,
> > > > >> >
> > > > >> > Keeping this thread alive!!
> > > > >> >
> > > > >> > The aim is to add two methods Kstream.toTable() &
> > > > >> > Kstream.toTable(Materialized), so users can choose to
> convert
> > > their
> > > > >> > event stream into a changelog stream at any stage.
> > > > >> > wiki link :
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> > > > >> > jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
> > > > >> >
> > > > >> > Best,
> > > > >> > Aishwarya
> > > > >> >
> > > > >> > On Fri, Sep 13, 2019 at 10:49 AM aishwarya kumar <
> > > ash26...@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> >> Hello,
> > > > >> >>
> > > > >> >> Starting this thread to discuss KIP-532:
> > > > >> >> wiki link :
> > > > >> >>
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> > > > >> >> jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
> > > > >> >>
> > > > >> >> There has been some discussion around the use-case of this KIP
> in
> > > the
> > > > >> Jira
> > > > >> >> ticket.
> > > > >> >>
> > > > >> >> Regards,
> > > > >> >> Aishwarya
> > > > >> >>
> > > > >> >
> > > > >>
> > > > >>
> > >
>


Re: [Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-09-27 Thread aishwarya kumar
Hello John,

Thank you for pointing this out to me, to maintain consistency across API's
it does make sense to allow users to define custom names for
their processors.

I've made the change in the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Best,
Aishwarya

On Tue, Sep 24, 2019 at 11:54 AM John Roesler  wrote:

> Hey Aishwarya,
>
> Thanks for the KIP! It looks good to me, although in a post-KIP-307
> world, we also need a "Named" parameter (to give the processor node a
> name, as opposed to the store itself).
>
> This would result in a total of four overloads:
> 1. no args
> 2. Named
> 3. Materialized
> 4. Materialized, Named
>
> I'd like to propose a re-design of the DSL in the future to clean this
> up, but for now, this is the pattern we have to follow.
>
> Thoughts?
>
> Thanks,
> -John
>
> On Tue, Sep 24, 2019 at 9:54 AM aishwarya kumar 
> wrote:
> >
> > Thank you for the suggestion Matthais, i've made the necessary changes in
> > the KIP.
> >
> > Keeping this thread open for further input.
> > KIP link:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> >
> > Best,
> > Aishwarya
> >
> > On Thu, Sep 19, 2019 at 10:50 AM aishwarya kumar 
> wrote:
> >
> > > Thanks Matthias,
> > >
> > > That does make sense, let me update the KIP to reflect the
> Materialization
> > > scenario.
> > >
> > > Best,
> > > Aishwarya
> > >
> > > On Tue, Sep 17, 2019, 2:49 PM Matthias J. Sax 
> > > wrote:
> > >
> > >> Aishwarya,
> > >>
> > >> thanks for the KIP. Overall, I think it makes sense to allow
> converting
> > >> a KStream into a KTable.
> > >>
> > >> From the KIP:
> > >>
> > >> > materializing these KTables should only be allowed if the overloaded
> > >> function with Materialized is used (and if optimization is turned on
> it may
> > >> still be only logically materialized if the queryable name is not
> set).
> > >>
> > >> Can you elaborate? I think the behavior we want should align with the
> > >> behavior of `StreamsBuilder#table()`.
> > >>
> > >> From my understanding (correct me if I am wrong) it should be:
> > >>
> > >> (1) If optimization is turned off, the KTable will always be
> > >> materialized, independent which method is used. The KTable will not be
> > >> queryable though.
> > >>
> > >> (2) If optimization is turned on and if `toTable()` is used, the
> KTable
> > >> may or may not be materialized. For this case, even if the KTable is
> > >> materialized, the store would not be queryable.
> > >>
> > >> (3) If `toTable(Materialized)` is use and a `storeName` or
> > >> `StoreSupplier` is specified, the store will always be materialized
> and
> > >> also be queryable. Otherwise, case (1) or (2) applies.
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 9/17/19 6:42 AM, aishwarya kumar wrote:
> > >> > Hi All,
> > >> >
> > >> > Keeping this thread alive!!
> > >> >
> > >> > The aim is to add two methods Kstream.toTable() &
> > >> > Kstream.toTable(Materialized), so users can choose to convert
> their
> > >> > event stream into a changelog stream at any stage.
> > >> > wiki link :
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> > >> > jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
> > >> >
> > >> > Best,
> > >> > Aishwarya
> > >> >
> > >> > On Fri, Sep 13, 2019 at 10:49 AM aishwarya kumar <
> ash26...@gmail.com>
> > >> wrote:
> > >> >
> > >> >> Hello,
> > >> >>
> > >> >> Starting this thread to discuss KIP-532:
> > >> >> wiki link :
> > >> >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> > >> >> jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
> > >> >>
> > >> >> There has been some discussion around the use-case of this KIP in
> the
> > >> Jira
> > >> >> ticket.
> > >> >>
> > >> >> Regards,
> > >> >> Aishwarya
> > >> >>
> > >> >
> > >>
> > >>
>


Re: [Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-09-24 Thread aishwarya kumar
Thank you for the suggestion Matthais, i've made the necessary changes in
the KIP.

Keeping this thread open for further input.
KIP link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Best,
Aishwarya

On Thu, Sep 19, 2019 at 10:50 AM aishwarya kumar  wrote:

> Thanks Matthias,
>
> That does make sense, let me update the KIP to reflect the Materialization
> scenario.
>
> Best,
> Aishwarya
>
> On Tue, Sep 17, 2019, 2:49 PM Matthias J. Sax 
> wrote:
>
>> Aishwarya,
>>
>> thanks for the KIP. Overall, I think it makes sense to allow converting
>> a KStream into a KTable.
>>
>> From the KIP:
>>
>> > materializing these KTables should only be allowed if the overloaded
>> function with Materialized is used (and if optimization is turned on it may
>> still be only logically materialized if the queryable name is not set).
>>
>> Can you elaborate? I think the behavior we want should align with the
>> behavior of `StreamsBuilder#table()`.
>>
>> From my understanding (correct me if I am wrong) it should be:
>>
>> (1) If optimization is turned off, the KTable will always be
>> materialized, independent which method is used. The KTable will not be
>> queryable though.
>>
>> (2) If optimization is turned on and if `toTable()` is used, the KTable
>> may or may not be materialized. For this case, even if the KTable is
>> materialized, the store would not be queryable.
>>
>> (3) If `toTable(Materialized)` is use and a `storeName` or
>> `StoreSupplier` is specified, the store will always be materialized and
>> also be queryable. Otherwise, case (1) or (2) applies.
>>
>>
>>
>> -Matthias
>>
>>
>> On 9/17/19 6:42 AM, aishwarya kumar wrote:
>> > Hi All,
>> >
>> > Keeping this thread alive!!
>> >
>> > The aim is to add two methods Kstream.toTable() &
>> > Kstream.toTable(Materialized), so users can choose to convert their
>> > event stream into a changelog stream at any stage.
>> > wiki link :
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
>> > jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
>> >
>> > Best,
>> > Aishwarya
>> >
>> > On Fri, Sep 13, 2019 at 10:49 AM aishwarya kumar 
>> wrote:
>> >
>> >> Hello,
>> >>
>> >> Starting this thread to discuss KIP-532:
>> >> wiki link :
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
>> >> jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
>> >>
>> >> There has been some discussion around the use-case of this KIP in the
>> Jira
>> >> ticket.
>> >>
>> >> Regards,
>> >> Aishwarya
>> >>
>> >
>>
>>


Re: [Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-09-19 Thread aishwarya kumar
Thanks Matthias,

That does make sense, let me update the KIP to reflect the Materialization
scenario.

Best,
Aishwarya

On Tue, Sep 17, 2019, 2:49 PM Matthias J. Sax  wrote:

> Aishwarya,
>
> thanks for the KIP. Overall, I think it makes sense to allow converting
> a KStream into a KTable.
>
> From the KIP:
>
> > materializing these KTables should only be allowed if the overloaded
> function with Materialized is used (and if optimization is turned on it may
> still be only logically materialized if the queryable name is not set).
>
> Can you elaborate? I think the behavior we want should align with the
> behavior of `StreamsBuilder#table()`.
>
> From my understanding (correct me if I am wrong) it should be:
>
> (1) If optimization is turned off, the KTable will always be
> materialized, independent which method is used. The KTable will not be
> queryable though.
>
> (2) If optimization is turned on and if `toTable()` is used, the KTable
> may or may not be materialized. For this case, even if the KTable is
> materialized, the store would not be queryable.
>
> (3) If `toTable(Materialized)` is use and a `storeName` or
> `StoreSupplier` is specified, the store will always be materialized and
> also be queryable. Otherwise, case (1) or (2) applies.
>
>
>
> -Matthias
>
>
> On 9/17/19 6:42 AM, aishwarya kumar wrote:
> > Hi All,
> >
> > Keeping this thread alive!!
> >
> > The aim is to add two methods Kstream.toTable() &
> > Kstream.toTable(Materialized), so users can choose to convert their
> > event stream into a changelog stream at any stage.
> > wiki link :
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> > jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
> >
> > Best,
> > Aishwarya
> >
> > On Fri, Sep 13, 2019 at 10:49 AM aishwarya kumar 
> wrote:
> >
> >> Hello,
> >>
> >> Starting this thread to discuss KIP-532:
> >> wiki link :
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> >> jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
> >>
> >> There has been some discussion around the use-case of this KIP in the
> Jira
> >> ticket.
> >>
> >> Regards,
> >> Aishwarya
> >>
> >
>
>


Re: [DISCUSS] KIP-479: Add Materialized to Join

2019-09-17 Thread aishwarya kumar
Hi Bill,

Thanks for clarifying, and the KIP looks great!!

Best regards,
Aishwarya

On Tue, Sep 17, 2019, 6:16 PM Bill Bejeck  wrote:

> Hi Aishwarya,
>
> On Tue, Sep 17, 2019 at 1:41 PM aishwarya kumar 
> wrote:
>
> > Will this be applicable to Kstream-Ktable joins as well? Or do users
> always
> > materialize these joins explicitly?
> >
>
> No, this change applies to KStream-KStream joins only.  With KStream-KTable
> joins KafkaStreams has materialized the KTable already, and we don't need
> to do anything with the KStream instance in this case.
>
>
> > I'm not sure if its even necessary (or makes sense), just trying to
> > understand why the change is applicable to Kstream joins only?
> >
>
> The full details are in the KIP, but in a nutshell, we needed to break the
> naming of the StateStore from `Joined.withName` and provide users a way to
> name the StateStore explicitly.  While making the changes, we realized it
> would be beneficial to give users the ability to use different WindowStore
> implementations.  The most likely reason to use different WindowStores
> would be to enable in-memory joins.
>
>
> > Best,
> > Aishwarya
> >
>
> Regards,
> Bill
>
>
> >
> > On Tue, Sep 17, 2019 at 4:05 PM Bill Bejeck  wrote:
> >
> > > Guozhang,
> > >
> > > Thanks for the comments.
> > >
> > > Yes, you are correct in your assessment regarding names, *if* the users
> > > provide their own StoreSuppliers  When constructing as StoreSupplier,
> the
> > > name can't be null, and additionally, there is no way to update the
> name.
> > > Further downstream, the underlying StateStore instances use the
> provided
> > > name for registration so they must be unique.  If users don't provide
> > > distinct names for the StoreSuppliers, KafkaStreams will thow a
> > > StreamsException when building the topology.
> > >
> > > Thanks,
> > > Bill
> > >
> > >
> > >
> > > On Tue, Sep 17, 2019 at 9:39 AM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Bill,
> > > >
> > > > Thanks for the updated KIP. I made a pass on the StreamJoined
> section.
> > > Just
> > > > a quick question from user's perspective: if a user wants to provide
> a
> > > > customized store-supplier, she is forced to also provide a name via
> the
> > > > store-supplier. So there's no way to say "I want to provide my own
> > store
> > > > engine but let the library decide its name", is that right?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck 
> wrote:
> > > >
> > > > > Bumping this discussion as we need to re-vote before the KIP
> > deadline.
> > > > >
> > > > > On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck 
> > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > While working on the implementation of KIP-479, some issues came
> to
> > > > light
> > > > > > that the KIP as written won't work.  I have updated the KIP with
> a
> > > > > solution
> > > > > > I believe will solve the original problem as well as address the
> > > > > > impediment to the initial approach.
> > > > > >
> > > > > > This update is a significant change, so please review the updated
> > KIP
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> > > > > and
> > > > > > provide feedback.  After we conclude the discussion, there will
> be
> > a
> > > > > > re-vote.
> > > > > >
> > > > > > Thanks!
> > > > > > Bill
> > > > > >
> > > > > > On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hi Bill, thanks for your explanations. I'm on board with your
> > > decision
> > > > > >> too.
> > > > > >>
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >> On Wed, Jul 17, 2019 

Re: [DISCUSS] KIP-479: Add Materialized to Join

2019-09-17 Thread aishwarya kumar
Will this be applicable to Kstream-Ktable joins as well? Or do users always
materialize these joins explicitly?
I'm not sure if its even necessary (or makes sense), just trying to
understand why the change is applicable to Kstream joins only?

Best,
Aishwarya

On Tue, Sep 17, 2019 at 4:05 PM Bill Bejeck  wrote:

> Guozhang,
>
> Thanks for the comments.
>
> Yes, you are correct in your assessment regarding names, *if* the users
> provide their own StoreSuppliers  When constructing as StoreSupplier, the
> name can't be null, and additionally, there is no way to update the name.
> Further downstream, the underlying StateStore instances use the provided
> name for registration so they must be unique.  If users don't provide
> distinct names for the StoreSuppliers, KafkaStreams will thow a
> StreamsException when building the topology.
>
> Thanks,
> Bill
>
>
>
> On Tue, Sep 17, 2019 at 9:39 AM Guozhang Wang  wrote:
>
> > Hello Bill,
> >
> > Thanks for the updated KIP. I made a pass on the StreamJoined section.
> Just
> > a quick question from user's perspective: if a user wants to provide a
> > customized store-supplier, she is forced to also provide a name via the
> > store-supplier. So there's no way to say "I want to provide my own store
> > engine but let the library decide its name", is that right?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck  wrote:
> >
> > > Bumping this discussion as we need to re-vote before the KIP deadline.
> > >
> > > On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck 
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > While working on the implementation of KIP-479, some issues came to
> > light
> > > > that the KIP as written won't work.  I have updated the KIP with a
> > > solution
> > > > I believe will solve the original problem as well as address the
> > > > impediment to the initial approach.
> > > >
> > > > This update is a significant change, so please review the updated KIP
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> > > and
> > > > provide feedback.  After we conclude the discussion, there will be a
> > > > re-vote.
> > > >
> > > > Thanks!
> > > > Bill
> > > >
> > > > On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang 
> > > wrote:
> > > >
> > > >> Hi Bill, thanks for your explanations. I'm on board with your
> decision
> > > >> too.
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck 
> > wrote:
> > > >>
> > > >> > Thanks for the response, John.
> > > >> >
> > > >> > > If I can offer my thoughts, it seems better to just document on
> > the
> > > >> > > Stream join javadoc for the Materialized parameter that it will
> > not
> > > >> > > make the join result queriable. I'm not opposed to the queriable
> > > flag
> > > >> > > in general, but introducing it is a much larger consideration
> that
> > > has
> > > >> > > previously derailed this KIP discussion. In the interest of just
> > > >> > > closing the gap and keeping the API change small, it seems
> better
> > to
> > > >> > > just go with documentation for now.
> > > >> >
> > > >> > I agree with your statement here.  IMHO the most important goal of
> > > this
> > > >> KIP
> > > >> > is to not breaking existing users and gain some consistency of the
> > > API.
> > > >> >
> > > >> > I'll update the KIP accordingly.
> > > >> >
> > > >> > -Bill
> > > >> >
> > > >> > On Tue, Jul 16, 2019 at 11:55 AM John Roesler 
> > > >> wrote:
> > > >> >
> > > >> > > Hi Bill,
> > > >> > >
> > > >> > > Thanks for driving this KIP toward a conclusion. I'm on board
> with
> > > >> > > your decision.
> > > >> > >
> > > >> > > You didn't mention whether you're still proposing to add the
> > > >> > > "queriable" flag to the Materialized config object, or just
> > document
> > > >> > > that a Stream join is never queriable. Both options have come up
> > > >> > > earlier in the discussion, so it would be good to pin this down.
> > > >> > >
> > > >> > > If I can offer my thoughts, it seems better to just document on
> > the
> > > >> > > Stream join javadoc for the Materialized parameter that it will
> > not
> > > >> > > make the join result queriable. I'm not opposed to the queriable
> > > flag
> > > >> > > in general, but introducing it is a much larger consideration
> that
> > > has
> > > >> > > previously derailed this KIP discussion. In the interest of just
> > > >> > > closing the gap and keeping the API change small, it seems
> better
> > to
> > > >> > > just go with documentation for now.
> > > >> > >
> > > >> > > Thanks again,
> > > >> > > -John
> > > >> > >
> > > >> > > On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck 
> > > >> wrote:
> > > >> > > >
> > > >> > > > Thanks all for the great discussion so far.
> > > >> > > >
> > > >> > > > Everyone has made excellent points, and I appreciate the
> detail
> > > >> > everyone
> > > >> > > > has put into their arguments.
> > > >> > > >
> > > >> > > > However, 

Re: [Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-09-17 Thread aishwarya kumar
Hi All,

Keeping this thread alive!!

The aim is to add two methods Kstream.toTable() &
Kstream.toTable(Materialized), so users can choose to convert their
event stream into a changelog stream at any stage.
wiki link :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658

Best,
Aishwarya

On Fri, Sep 13, 2019 at 10:49 AM aishwarya kumar  wrote:

> Hello,
>
> Starting this thread to discuss KIP-532:
> wiki link :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
> jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658
>
> There has been some discussion around the use-case of this KIP in the Jira
> ticket.
>
> Regards,
> Aishwarya
>


[Discuss] - KIP-532 - Add KStream#toTable to the Streams DSL

2019-09-13 Thread aishwarya kumar
Hello,

Starting this thread to discuss KIP-532:
wiki link :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-523:+Add+KStream%23toTable+to+the+Streams+DSL
jira ticket : https://issues.apache.org/jira/browse/KAFKA-7658

There has been some discussion around the use-case of this KIP in the Jira
ticket.

Regards,
Aishwarya


  1   2   3   4   >