Question Kafka Reassign partitions tool

2017-12-22 Thread Sagar
Hi,

Had a question on Kafka reassign partitions tool.

We have a 3 node cluster but our replication factor is set to 1. So we have
been looking to increase it to 3 for HA.

I tried the tool on a couple of topics and it increases the replication
factor alright. Also it doesn't change the leader as the leader still is in
the RAR.

This is how I run it:

Json which is used:

{"version":1,"partitions":[

{"topic":"cric-engine.engine.eng_fow","partition":3,"replicas":[92,51,101]}]}

Earlier config for the topic

kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
Configs:
Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
92,51,101 Isr: 101,51,92
Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
92,51,101 Isr: 51,101,92
Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr:
92
Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 101 Isr:
101
Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr:
51

After running:

 kafka-reassign-partitions --reassignment-json-file
increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181,
10.0.5.139:2181,10.0.6.106:2181

partitions 3 Replicas increase:

kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
Configs:
Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
92,51,101 Isr: 101,51,92
Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
92,51,101 Isr: 51,101,92
Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr:
92
Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas:
92,51,101 Isr: 101,51,92
Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr:
51

What I wanted to know is that does it affect the preferred replica? If you
see the Replicas, all of them are now 92,51,101 even though the leader has
remained the same from before. So, if any of the broker goes down or we
run kafka-preferred-replica-election.sh, wouldn't it move all the leaders
to broker 92? Is my assesment correct?

If yes, then is there a way I can still do this operation by getting leader
for a partition first, then adding it to the replica list and then building
the json dynamically?

Thanks!
Sagar.


Re: Question Kafka Reassign partitions tool

2017-12-22 Thread Sagar
Hi Todd,

Thanks for the reply. Problem is I have about 160 topics(5 partitions for
each) for which I need to increase the replication factors for. So, I would
have to find the current leader for each of the partitions and hand code
the json which would become tedious.

The partition leader info is stored in zookeeper? If yes, then can I
probably use that get the current json and then build the json which can be
fed to the tool. I tried to search using the zk shell but couldn't find...

Sagar.

On Fri, Dec 22, 2017 at 7:45 PM, Todd Palino  wrote:

> Preferred replica election is naive. It will always follow the order of the
> replicas as they are set. So if you want to set the default leader, just
> make it the first replica in the list for the partition. We build the JASON
> this way all the time.
>
> -Todd
>
>
> On Dec 22, 2017 6:46 AM, "Sagar"  wrote:
>
> Hi,
>
> Had a question on Kafka reassign partitions tool.
>
> We have a 3 node cluster but our replication factor is set to 1. So we have
> been looking to increase it to 3 for HA.
>
> I tried the tool on a couple of topics and it increases the replication
> factor alright. Also it doesn't change the leader as the leader still is in
> the RAR.
>
> This is how I run it:
>
> Json which is used:
>
> {"version":1,"partitions":[
>
> {"topic":"cric-engine.engine.eng_fow","partition":3,"
> replicas":[92,51,101]}]}
>
> Earlier config for the topic
>
> kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
> 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
> Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
> Configs:
> Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
> 92,51,101 Isr: 101,51,92
> Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
> 92,51,101 Isr: 51,101,92
> Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr:
> 92
> Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 101
> Isr:
> 101
> Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr:
> 51
>
> After running:
>
>  kafka-reassign-partitions --reassignment-json-file
> increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181,
> 10.0.5.139:2181,10.0.6.106:2181
>
> partitions 3 Replicas increase:
>
> kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
> 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
> Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
> Configs:
> Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
> 92,51,101 Isr: 101,51,92
> Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
> 92,51,101 Isr: 51,101,92
> Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr:
> 92
> Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas:
> 92,51,101 Isr: 101,51,92
> Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr:
> 51
>
> What I wanted to know is that does it affect the preferred replica? If you
> see the Replicas, all of them are now 92,51,101 even though the leader has
> remained the same from before. So, if any of the broker goes down or we
> run kafka-preferred-replica-election.sh, wouldn't it move all the leaders
> to broker 92? Is my assesment correct?
>
> If yes, then is there a way I can still do this operation by getting leader
> for a partition first, then adding it to the replica list and then building
> the json dynamically?
>
> Thanks!
> Sagar.
>


Kafka Consumer manual partition assignment

2017-12-26 Thread Sagar
We have a use case where in we want to assign partitions manually for a set
of topics to allow fine grained control of the records we are fetching.

Basically what we are trying to achieve is that a group of messages which
logically belong to a particular entity should be sent to the same
partition . So, Topic A and topic B both have a certain field which are
unique and common( say some id = 101) so we want to send both of those
records to partition 1 ( 101 % 10 is our simple partition assignment
strategy written in both producer and Kafka Connect interceptor.

Fro what I understood, if I want to have a consumer which listens to
partition 1 for both Topic A and Topic B, then we need to use the assign
method.

I have been reading up a bit and what has been mentioned everywhere is that
we won't have any rebalancing triggered. Also, I tried a simple use case
where in I didn't poll for more than the value configured in
group.max.session.timeout.ms but it didn't die. Is it because its not part
of a consumer group per se?

So just wanted to understand what points should we declare a consumer is
dead so that we can spin up a new consumer for the same topic partition. We
are using AWS ECS for running our consumers so target group would spin up a
new consumer based upon health checks.

Any examples + guidelines around this would be highly appreciated.

Thanks!
Sagar.


Re: Question Kafka Reassign partitions tool

2017-12-26 Thread Sagar
Hi Todd,

That worked. Basically I used a python library to read the leaders of a
topic/partition from the following path on zookeeper:
/brokers/topics/[topic]/partitions/[partitionId]/state.

Then I was able to get the correct leader and assign the replica set so
that the current leader remains the first entry in that list, basically to
keep the preferred replica logic intact.

Thanks for the help.

Thanks!
Sagar.

On Fri, Dec 22, 2017 at 10:08 PM, Todd Palino  wrote:

> Yes, the replicas are stored in Zookeeper, so you can iterate over the
> information there to build a view of the cluster that you can use. If you
> want an example for this, take a look at the code for kafka-assigner in
> https://github.com/linkedin/kafka-tools. Or you can just use that tool to
> adjust replication factors and balance partitions.
>
> -Todd
>
>
> On Fri, Dec 22, 2017 at 9:21 AM, Sagar  wrote:
>
> > Hi Todd,
> >
> > Thanks for the reply. Problem is I have about 160 topics(5 partitions for
> > each) for which I need to increase the replication factors for. So, I
> would
> > have to find the current leader for each of the partitions and hand code
> > the json which would become tedious.
> >
> > The partition leader info is stored in zookeeper? If yes, then can I
> > probably use that get the current json and then build the json which can
> be
> > fed to the tool. I tried to search using the zk shell but couldn't
> find...
> >
> > Sagar.
> >
> > On Fri, Dec 22, 2017 at 7:45 PM, Todd Palino  wrote:
> >
> > > Preferred replica election is naive. It will always follow the order of
> > the
> > > replicas as they are set. So if you want to set the default leader,
> just
> > > make it the first replica in the list for the partition. We build the
> > JASON
> > > this way all the time.
> > >
> > > -Todd
> > >
> > >
> > > On Dec 22, 2017 6:46 AM, "Sagar"  wrote:
> > >
> > > Hi,
> > >
> > > Had a question on Kafka reassign partitions tool.
> > >
> > > We have a 3 node cluster but our replication factor is set to 1. So we
> > have
> > > been looking to increase it to 3 for HA.
> > >
> > > I tried the tool on a couple of topics and it increases the replication
> > > factor alright. Also it doesn't change the leader as the leader still
> is
> > in
> > > the RAR.
> > >
> > > This is how I run it:
> > >
> > > Json which is used:
> > >
> > > {"version":1,"partitions":[
> > >
> > > {"topic":"cric-engine.engine.eng_fow","partition":3,"
> > > replicas":[92,51,101]}]}
> > >
> > > Earlier config for the topic
> > >
> > > kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
> > > 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
> > > Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
> > > Configs:
> > > Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
> > > 92,51,101 Isr: 101,51,92
> > > Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
> > > 92,51,101 Isr: 51,101,92
> > > Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92
> > Isr:
> > > 92
> > > Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas:
> 101
> > > Isr:
> > > 101
> > > Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51
> > Isr:
> > > 51
> > >
> > > After running:
> > >
> > >  kafka-reassign-partitions --reassignment-json-file
> > > increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181
> ,
> > > 10.0.5.139:2181,10.0.6.106:2181
> > >
> > > partitions 3 Replicas increase:
> > >
> > > kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
> > > 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
> > > Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
> > > Configs:
> > > Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
> > > 92,51,101 Isr: 101,51,92
> > > Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
> > > 92,51,101 Isr: 51,101,92
> > > Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92
> > Isr:
> > > 92
> > > Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas:
> > >

Re: Kafka Consumer manual partition assignment

2018-01-02 Thread Sagar
Hi,

Any help here would be highly appreciated :)

Sagar.

On Wed, 27 Dec 2017 at 07:53, Sagar  wrote:

> We have a use case where in we want to assign partitions manually for a
> set of topics to allow fine grained control of the records we are fetching.
>
> Basically what we are trying to achieve is that a group of messages which
> logically belong to a particular entity should be sent to the same
> partition . So, Topic A and topic B both have a certain field which are
> unique and common( say some id = 101) so we want to send both of those
> records to partition 1 ( 101 % 10 is our simple partition assignment
> strategy written in both producer and Kafka Connect interceptor.
>
> Fro what I understood, if I want to have a consumer which listens to
> partition 1 for both Topic A and Topic B, then we need to use the assign
> method.
>
> I have been reading up a bit and what has been mentioned everywhere is
> that we won't have any rebalancing triggered. Also, I tried a simple use
> case where in I didn't poll for more than the value configured in
> group.max.session.timeout.ms but it didn't die. Is it because its not
> part of a consumer group per se?
>
> So just wanted to understand what points should we declare a consumer is
> dead so that we can spin up a new consumer for the same topic partition. We
> are using AWS ECS for running our consumers so target group would spin up a
> new consumer based upon health checks.
>
> Any examples + guidelines around this would be highly appreciated.
>
> Thanks!
> Sagar.
>


Question Regarding seek() in KafkaConsumer

2018-01-03 Thread Sagar
Hi,

I have a use case where in I need exactly once processing for data across
various topics. So, this means that in case of errors, the next time poll()
method runs, it should start off from the exact same record.

The approach I have used is,

1) Call poll() and get a list of records.

2) For each record:

2.1) Get the processing status of a record.

2.2) In case of success, add it to a list of records to be committed.

2.3) In case of failure:

a) Commit whatever records were collected till now.
b) For each topic partition, get the last committed offset using
committed() method in KafkaConsumer.
c) Seek the topic partition to the offset obtained from above.
break the loop.


In my mind it should have worked, but then I experienced some data loss
along the way when I ran it from earliest(across 17 topics 5 partitions).

The javadocs for seek say :

*If this API is invoked for the same partition more than once, the latest
offset will be used on the next poll(). Note that you may lose data if this
API is arbitrarily used in the middle of consumption, to reset the fetch
offsets*

I am committing offsets manually (sync). Just wanted to understand is i not
safe to call  this api this way?

If I want to reliably retry then what patterns should be used? Couple of
other approaches I read in Kafka definitive gide were to push the retriable
records to a buffer or to a separate topic. So, are those the correct ways
of retrying and not this?

Thanks!
Sagar.


KAFKA-8238

2019-04-26 Thread Sagar
Hi all,

I would like to start contributing to the Kafka project. I looked at the
newbie tickets and thought I could start with this:(?)

https://issues.apache.org/jira/browse/KAFKA-8238

Or if there's anything else that I can look at, please let me know.

Sagar.


Re: Kafka Connect

2019-06-04 Thread Sagar
Hi meirkhan,

You can give debezium a shot. It has a connector for mongodb


On Tue, 4 Jun 2019 at 6:56 PM,  wrote:

> Hello Kafka!
>
> Here is my case. I need the Kafka Connector to Mongodb which imitates JDBC
> Source Connector.
> So that data is loaded to Kafka by periodically executing a query. I could
> not find any such tool. Can you give any resources or advices on it?
>
> Thank you,
> Meirkhan
>
>
> _
>
> Ce message et ses pieces jointes peuvent contenir des informations
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez
> recu ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and
> delete this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been
> modified, changed or falsified.
> Thank you.
>
>


Need Help regarding Jenkins buil

2016-10-23 Thread Sagar
Hi,

Not sure if this is the right forum, but I had raised a Pull request a
couple of days ago which enables the connect schema to handle signed
and unsigned values for tinyint values in mysql.

Here's the link to the PR:

https://github.com/apache/kafka/pull/2044

Apparently the jenkins build failed with the following error:

https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/6311/testReport/junit/kafka.admin/ReassignPartitionsClusterTest/shouldExecuteThrottledReassignment/

But, this error has got nothing to do with my changes. So, wanted to
understand what are the next steps here? How can I have this PR being
reviewed by someone?

Thanks!
Sagar.


Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-09 Thread Sagar
Hi John,

Thanks for the response. For starters, for our use case, we tweaked our
keys etc to avoid prefix scans. So, we are good there.

Regarding the KIP, I see what you mean when you say that the same key type
for prefix won't work. For example, continuing with the UUID example that
you gave, let's say one of the UUIDs
is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want to
fetch all keys starting with 123. There's already a UUIDSerde so if the
keys have been stored with that, then using UUIDSerde for prefixes won't
help- I am not sure if the UUIDSerializer would even work for 123.

So, that indicates that we will need to provide a new prefix key type
serializer. Having said that, how it will be stitched together and finally
exposed using the APIs is something that is up for questioning. This is
something you have also brought up in the earlier emails. If it
makes sense, I can modify my PR to go along these lines. Please let me know
what you think.

Lastly, I didn't understand this line of yours: *It might help if there are
other typed key/value stores to compare APIs with.*

Thanks!
Sagar.


On Thu, Jun 4, 2020 at 6:03 AM John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the question, and sorry for muddying the water.
>
> I meant the Bytes/byte[] thing as advice for how you all can solve your
> problem in the mean time, while we work through this KIP. I don’t think
> it’s relevant for the KIP itself.
>
> I think the big issue here is what the type of the prefix should be in the
> method signature. Using the same type as the key makes sense some times,
> but not other times. I’m not sure what the best way around this might be.
> It might help if there are other typed key/value stores to compare APIs
> with.
>
> Thanks,
> John
>
> On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> > Hi John,
> >
> > Just to add to my previous email(and sorry for the spam), if we consider
> > using Bytes/byte[] and manually invoke the serdes, if you could provide
> > examples where the same Serde for keys won't work for the prefix types.
> As
> > far as my understanding goes, the prefix seek would depend upon ordering
> of
> > the keys like lexicographic. As long as the binary format is consistent
> for
> > both the keys and the prefixes would it not ensure the ability to search
> in
> > that same ordering space? This is from my limited understanding so any
> > concrete examples would be helpful...
> >
> > Also, you mentioned about the creation of dummy values to indicate prefix
> > values, do you mean this line:
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L91
> > This
> > is where the prefix key is built and used for searching .
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Jun 1, 2020 at 11:42 AM Sagar  wrote:
> >
> > > Hi John,
> > >
> > > Thank you. I think it makes sense to modify the KIP to add the
> > > prefixScan() as part of the existing interfaces and add the new mixin
> > > behaviour as Rejected alternatives. I am not very aware of other stores
> > > apart from keyValueStore so is it fine if I keep it there for now?
> > >
> > > Regarding the type definition of types I will try and think about some
> > > alternatives and share if I get any.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Sun, May 31, 2020 at 1:55 AM John Roesler 
> wrote:
> > >
> > >> Hi Sagar,
> > >>
> > >> Thanks for the response. Your use case makes sense to me; I figured it
> > >> must be something like that.
> > >>
> > >> On a pragmatic level, in the near term, you might consider basically
> > >> doing the same thing we did in KIP-213. If you swap out the store
> types for
> > >> Byte/byte[] and “manually” invoke the serdes in your own logic, then
> you
> > >> can use the same algorithm we did to derive the range scan boundaries
> from
> > >> your desired prefix.
> > >>
> > >> For the actual KIP, it seems like we would need significant design
> > >> improvements to be able to do any mixins, so I think we should favor
> > >> proposing either to just add to the existing interfaces or to create
> brand
> > >> new interfaces, as appropriate, for now. Given that prefix can be
> converted
> > >> to a range query at a low level, I think we can probably explore
> adding
> > >> prefix to the existing inter

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-09 Thread Sagar
Hi John,

You rightly pointed out, the devil is in the detail :). I will start with
the implementation to get a sense.

Here are my thoughts on the core challenge that you pointed out. The key
value store abstractions that have been exposed via the state store DSL
APIs, make it possible for the end user to define generic key types.
However, the Serdes are the one which convert those generic keys/values
into the format in which the state store stores them- which for all
practical purposes are byte-arrays. I think with the prefix type serde, if
it converts the prefix to the same internal storage type (byte array) as
that of the Keys, then we should be able to do a prefix scan.

Regarding other databases, I have worked a bit with Redis which also
provides a scan operator using the glob style pattern match(it's more
evolved than prefix scan but can be converted easily):

https://redis.io/commands/scan#the-match-option

Typically Redis works with Binary Safe strings so the prefix key type and
the actual keys are of the same type.

Thanks!
Sagar.



On Wed, Jun 10, 2020 at 1:41 AM John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the reply. I agree that your UUID example illustrates the
> problem I was pointing out.
>
> Yes, I think that experimenting with the API in the PR is probably the
> best way to make progress (as opposed to just thinking in terms of
> design on the wiki) because with this kind of thing, the devil is often
> in the details.
>
> To clarify what I meant by that last statement, I see the core challenge
> here as deriving from the fact that we have a key/value store with
> generically typed keys, with a separate component (the serde) that
> turns those typed keys into bytes for storage. In contrast, RocksDB
> can easily offer a "prefix scan" operation because they key type is
> always just a byte array, so "prefix" is a very natural concept to offer
> in the API. Other key/value stores force the keys to always be strings,
> which also makes it easy to define a prefix scan.
>
> My question is whether there are other databases that offer both:
> 1. generically typed keys (as opposed to just bytes, just strings, etc)
> 2. prefix scans
> And, if so, what the API looks like.
>
> Thanks,
> -John
>
> On Tue, Jun 9, 2020, at 11:51, Sagar wrote:
> > Hi John,
> >
> > Thanks for the response. For starters, for our use case, we tweaked our
> > keys etc to avoid prefix scans. So, we are good there.
> >
> > Regarding the KIP, I see what you mean when you say that the same key
> type
> > for prefix won't work. For example, continuing with the UUID example that
> > you gave, let's say one of the UUIDs
> > is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want
> to
> > fetch all keys starting with 123. There's already a UUIDSerde so if the
> > keys have been stored with that, then using UUIDSerde for prefixes won't
> > help- I am not sure if the UUIDSerializer would even work for 123.
> >
> > So, that indicates that we will need to provide a new prefix key type
> > serializer. Having said that, how it will be stitched together and
> finally
> > exposed using the APIs is something that is up for questioning. This is
> > something you have also brought up in the earlier emails. If it
> > makes sense, I can modify my PR to go along these lines. Please let me
> know
> > what you think.
> >
> > Lastly, I didn't understand this line of yours: *It might help if there
> are
> > other typed key/value stores to compare APIs with.*
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Jun 4, 2020 at 6:03 AM John Roesler  wrote:
> >
> > > Hi Sagar,
> > >
> > > Thanks for the question, and sorry for muddying the water.
> > >
> > > I meant the Bytes/byte[] thing as advice for how you all can solve your
> > > problem in the mean time, while we work through this KIP. I don’t think
> > > it’s relevant for the KIP itself.
> > >
> > > I think the big issue here is what the type of the prefix should be in
> the
> > > method signature. Using the same type as the key makes sense some
> times,
> > > but not other times. I’m not sure what the best way around this might
> be.
> > > It might help if there are other typed key/value stores to compare APIs
> > > with.
> > >
> > > Thanks,
> > > John
> > >
> > > On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> > > > Hi John,
> > > >
> > > > Just to add to my previous email(and sorry for the spam), if we
> consider
> > > > using Bytes/byte[] 

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-28 Thread Sagar
Hi John,

I took some time out and as we discussed, looked to implement these
changes. Most of these changes are for demonstrative purposes but I thought
I will share.

I added the new prefixSeek method at the KeyValueStore interface level:

https://github.com/confluentinc/kafka/pull/242/files#diff-5e92747b506c868db3948323478e1b07R74-R83

As you had pointed out, the prefix type can be different from the key type.
That's why this method takes 2 parameters. the key type and it's serializer.

Then I added the implementation of this method in a couple of Stores.
RocksDBStore:

https://github.com/confluentinc/kafka/pull/242/commits#diff-046ca566243518c88e007b7499ec9f51R308-R320
and
InMemoryKVStore:

https://github.com/confluentinc/kafka/pull/242/commits#diff-4c685a32e765eab60bcb60097768104eR108-R120

I modified the older test cases for RocksDBStore. You can find them here:

https://github.com/confluentinc/kafka/pull/242/commits#diff-051439f56f0d6a12334d7e8cc4f66bf8R304-R415


I have added a test case where the keys are of type UUID while the prefix
is of type string. This seems to be working because the code is able to
pull in UUIDs with the provided prefix, even though their types are
different.

To address one of the gaps from my previous implementation, I have also
added a test case for the end to end flow using the state store supplier.
you can find it here:

https://github.com/confluentinc/kafka/pull/242/commits#diff-a94de5b2ec72d09ebac7183c31d7c906R269-R305

Note that for this to work, i needed to update MeteredKVstore and
ChangeLoggingKVStore.

Lastly, barring the 4 stores mentioned above, rest of the implementers of
KVStore have the prefixSeek override as null. As I said, this is mainly for
demonstrative purposes and hence done this way.
If you get the chance, it would be great if you can provide some feedback
on this.

Thanks!
Sagar.


On Wed, Jun 10, 2020 at 9:21 AM Sagar  wrote:

> Hi John,
>
> You rightly pointed out, the devil is in the detail :). I will start with
> the implementation to get a sense.
>
> Here are my thoughts on the core challenge that you pointed out. The key
> value store abstractions that have been exposed via the state store DSL
> APIs, make it possible for the end user to define generic key types.
> However, the Serdes are the one which convert those generic keys/values
> into the format in which the state store stores them- which for all
> practical purposes are byte-arrays. I think with the prefix type serde, if
> it converts the prefix to the same internal storage type (byte array) as
> that of the Keys, then we should be able to do a prefix scan.
>
> Regarding other databases, I have worked a bit with Redis which also
> provides a scan operator using the glob style pattern match(it's more
> evolved than prefix scan but can be converted easily):
>
> https://redis.io/commands/scan#the-match-option
>
> Typically Redis works with Binary Safe strings so the prefix key type and
> the actual keys are of the same type.
>
> Thanks!
> Sagar.
>
>
>
> On Wed, Jun 10, 2020 at 1:41 AM John Roesler  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the reply. I agree that your UUID example illustrates the
>> problem I was pointing out.
>>
>> Yes, I think that experimenting with the API in the PR is probably the
>> best way to make progress (as opposed to just thinking in terms of
>> design on the wiki) because with this kind of thing, the devil is often
>> in the details.
>>
>> To clarify what I meant by that last statement, I see the core challenge
>> here as deriving from the fact that we have a key/value store with
>> generically typed keys, with a separate component (the serde) that
>> turns those typed keys into bytes for storage. In contrast, RocksDB
>> can easily offer a "prefix scan" operation because they key type is
>> always just a byte array, so "prefix" is a very natural concept to offer
>> in the API. Other key/value stores force the keys to always be strings,
>> which also makes it easy to define a prefix scan.
>>
>> My question is whether there are other databases that offer both:
>> 1. generically typed keys (as opposed to just bytes, just strings, etc)
>> 2. prefix scans
>> And, if so, what the API looks like.
>>
>> Thanks,
>> -John
>>
>> On Tue, Jun 9, 2020, at 11:51, Sagar wrote:
>> > Hi John,
>> >
>> > Thanks for the response. For starters, for our use case, we tweaked our
>> > keys etc to avoid prefix scans. So, we are good there.
>> >
>> > Regarding the KIP, I see what you mean when you say that the same key
>> type
>> > for prefix won't work. For example, continuing with the UUID example
>> that
>> >

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-28 Thread Sagar
Hi John,

Thank you for the positive feedback! The meaningful discussions we had on
the mailing list helped me understand what needed to be done.

I am definitely open to any further suggestions on this.

Before I updated the KIP, I just had one question, is it fine to have it
for KeyValueStore or should I move it to ReadOnlyKeyValueStore where even
the range query resides?

Regarding the 2 notes on UnsupportedOperationException and changing the
name to prefixScan, i will incorporate both of them into the KIP.

Thanks!
Sagar.

On Sun, Jun 28, 2020 at 11:55 PM John Roesler  wrote:

> Woah, this is great, Sagar!
>
> I think this API looks really good. I'm curious if anyone else has
> any concern. For my part, I think this will work just fine. People
> might face tricky bugs getting their key serde and their prefix
> serde "aligned", but I think the API makes it pretty obvious what
> has to happen to make this work. As long as the API isn't going
> to "trick" anyone by trying to abstract away things that can't be
> abstracted, this is the best we can do. In other words, I think
> your approach is ideal here.
>
> I also really appreciate that you took the time to do a full POC
> with end-to-end tests to show that the proposal is actually
> going to work.
>
> A couple of notes as you update the KIP:
>
> 1. I think that for "optional" state store features like this, we
> should add a default implementation to the interface that
> throws UnsupportedOperationException. That way,
> any existing store implementations won't fail to compile
> on the new version. And any store that just can't support
> a prefix scan would simply not override the method.
>
> 2. I think you meant "prefixScan", not "prefixSeek", since
> we're actually getting an iterator that only returns prefix-
> matching keys, as opposed to just seeking to that prefix.
>
> Thanks again for the work you've put into this. I look
> forward to reviewing the updated KIP.
>
> Thanks,
> -John
>
>
> On Sun, Jun 28, 2020, at 12:17, Sagar wrote:
> > Hi John,
> >
> > I took some time out and as we discussed, looked to implement these
> > changes. Most of these changes are for demonstrative purposes but I
> thought
> > I will share.
> >
> > I added the new prefixSeek method at the KeyValueStore interface level:
> >
> >
> https://github.com/confluentinc/kafka/pull/242/files#diff-5e92747b506c868db3948323478e1b07R74-R83
> >
> > As you had pointed out, the prefix type can be different from the key
> type.
> > That's why this method takes 2 parameters. the key type and it's
> serializer.
> >
> > Then I added the implementation of this method in a couple of Stores.
> > RocksDBStore:
> >
> >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-046ca566243518c88e007b7499ec9f51R308-R320
> > and
> > InMemoryKVStore:
> >
> >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-4c685a32e765eab60bcb60097768104eR108-R120
> >
> > I modified the older test cases for RocksDBStore. You can find them here:
> >
> >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-051439f56f0d6a12334d7e8cc4f66bf8R304-R415
> >
> >
> > I have added a test case where the keys are of type UUID while the prefix
> > is of type string. This seems to be working because the code is able to
> > pull in UUIDs with the provided prefix, even though their types are
> > different.
> >
> > To address one of the gaps from my previous implementation, I have also
> > added a test case for the end to end flow using the state store supplier.
> > you can find it here:
> >
> >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-a94de5b2ec72d09ebac7183c31d7c906R269-R305
> >
> > Note that for this to work, i needed to update MeteredKVstore and
> > ChangeLoggingKVStore.
> >
> > Lastly, barring the 4 stores mentioned above, rest of the implementers of
> > KVStore have the prefixSeek override as null. As I said, this is mainly
> for
> > demonstrative purposes and hence done this way.
> > If you get the chance, it would be great if you can provide some feedback
> > on this.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Jun 10, 2020 at 9:21 AM Sagar  wrote:
> >
> > > Hi John,
> > >
> > > You rightly pointed out, the devil is in the detail :). I will start
> with
> > > the implementation to get a sense.
> > >
> > > Here are my thoughts on the core challenge that you pointed out. The
> key
>

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-04 Thread Sagar
Hi John,

I have updated the KIP with all the new changes we discussed in this
discussion thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores

Request you to go through the same.

Thanks!
Sagar.

On Tue, Jun 30, 2020 at 8:09 AM John Roesler  wrote:

> Hi Sagar,
>
> That’s a good observation; yes, it should go in the ReadOnlyKeyValueStore
> interface.
>
> Thanks again for the great work,
> John
>
> On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > Hi John,
> >
> > Thank you for the positive feedback! The meaningful discussions we had on
> > the mailing list helped me understand what needed to be done.
> >
> > I am definitely open to any further suggestions on this.
> >
> > Before I updated the KIP, I just had one question, is it fine to have it
> > for KeyValueStore or should I move it to ReadOnlyKeyValueStore where even
> > the range query resides?
> >
> > Regarding the 2 notes on UnsupportedOperationException and changing the
> > name to prefixScan, i will incorporate both of them into the KIP.
> >
> > Thanks!
> > Sagar.
> >
> > On Sun, Jun 28, 2020 at 11:55 PM John Roesler 
> wrote:
> >
> > > Woah, this is great, Sagar!
> > >
> > > I think this API looks really good. I'm curious if anyone else has
> > > any concern. For my part, I think this will work just fine. People
> > > might face tricky bugs getting their key serde and their prefix
> > > serde "aligned", but I think the API makes it pretty obvious what
> > > has to happen to make this work. As long as the API isn't going
> > > to "trick" anyone by trying to abstract away things that can't be
> > > abstracted, this is the best we can do. In other words, I think
> > > your approach is ideal here.
> > >
> > > I also really appreciate that you took the time to do a full POC
> > > with end-to-end tests to show that the proposal is actually
> > > going to work.
> > >
> > > A couple of notes as you update the KIP:
> > >
> > > 1. I think that for "optional" state store features like this, we
> > > should add a default implementation to the interface that
> > > throws UnsupportedOperationException. That way,
> > > any existing store implementations won't fail to compile
> > > on the new version. And any store that just can't support
> > > a prefix scan would simply not override the method.
> > >
> > > 2. I think you meant "prefixScan", not "prefixSeek", since
> > > we're actually getting an iterator that only returns prefix-
> > > matching keys, as opposed to just seeking to that prefix.
> > >
> > > Thanks again for the work you've put into this. I look
> > > forward to reviewing the updated KIP.
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Sun, Jun 28, 2020, at 12:17, Sagar wrote:
> > > > Hi John,
> > > >
> > > > I took some time out and as we discussed, looked to implement these
> > > > changes. Most of these changes are for demonstrative purposes but I
> > > thought
> > > > I will share.
> > > >
> > > > I added the new prefixSeek method at the KeyValueStore interface
> level:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/files#diff-5e92747b506c868db3948323478e1b07R74-R83
> > > >
> > > > As you had pointed out, the prefix type can be different from the key
> > > type.
> > > > That's why this method takes 2 parameters. the key type and it's
> > > serializer.
> > > >
> > > > Then I added the implementation of this method in a couple of Stores.
> > > > RocksDBStore:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-046ca566243518c88e007b7499ec9f51R308-R320
> > > > and
> > > > InMemoryKVStore:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-4c685a32e765eab60bcb60097768104eR108-R120
> > > >
> > > > I modified the older test cases for RocksDBStore. You can find them
> here:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-051439f56f0d6a12334d7e8cc4f66bf8R304-R415
> > > >
> > > >
> > > > I have added a

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-05 Thread Sagar
Hi John,

Thank you! Question on the comment, where should I add the default
implementation? I guess that needs to be added in the Proposal Section of
the kIP.

Thanks!
Sagar.

On Sat, Jul 4, 2020 at 11:46 PM John Roesler  wrote:

> Thanks Sagar,
>
> That looks good to me! The only minor comment I’d make is that I think the
> method declaration should have a default implementation that throws an
> UnsupportedOperationException, for source compatibility with existing state
> stores.
>
> Otherwise, as far as I’m concerned, I’m ready to vote.
>
> Thanks,
> John
>
> On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> > Hi John,
> >
> > I have updated the KIP with all the new changes we discussed in this
> > discussion thread.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> >
> > Request you to go through the same.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jun 30, 2020 at 8:09 AM John Roesler 
> wrote:
> >
> > > Hi Sagar,
> > >
> > > That’s a good observation; yes, it should go in the
> ReadOnlyKeyValueStore
> > > interface.
> > >
> > > Thanks again for the great work,
> > > John
> > >
> > > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > > Hi John,
> > > >
> > > > Thank you for the positive feedback! The meaningful discussions we
> had on
> > > > the mailing list helped me understand what needed to be done.
> > > >
> > > > I am definitely open to any further suggestions on this.
> > > >
> > > > Before I updated the KIP, I just had one question, is it fine to
> have it
> > > > for KeyValueStore or should I move it to ReadOnlyKeyValueStore where
> even
> > > > the range query resides?
> > > >
> > > > Regarding the 2 notes on UnsupportedOperationException and changing
> the
> > > > name to prefixScan, i will incorporate both of them into the KIP.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Sun, Jun 28, 2020 at 11:55 PM John Roesler 
> > > wrote:
> > > >
> > > > > Woah, this is great, Sagar!
> > > > >
> > > > > I think this API looks really good. I'm curious if anyone else has
> > > > > any concern. For my part, I think this will work just fine. People
> > > > > might face tricky bugs getting their key serde and their prefix
> > > > > serde "aligned", but I think the API makes it pretty obvious what
> > > > > has to happen to make this work. As long as the API isn't going
> > > > > to "trick" anyone by trying to abstract away things that can't be
> > > > > abstracted, this is the best we can do. In other words, I think
> > > > > your approach is ideal here.
> > > > >
> > > > > I also really appreciate that you took the time to do a full POC
> > > > > with end-to-end tests to show that the proposal is actually
> > > > > going to work.
> > > > >
> > > > > A couple of notes as you update the KIP:
> > > > >
> > > > > 1. I think that for "optional" state store features like this, we
> > > > > should add a default implementation to the interface that
> > > > > throws UnsupportedOperationException. That way,
> > > > > any existing store implementations won't fail to compile
> > > > > on the new version. And any store that just can't support
> > > > > a prefix scan would simply not override the method.
> > > > >
> > > > > 2. I think you meant "prefixScan", not "prefixSeek", since
> > > > > we're actually getting an iterator that only returns prefix-
> > > > > matching keys, as opposed to just seeking to that prefix.
> > > > >
> > > > > Thanks again for the work you've put into this. I look
> > > > > forward to reviewing the updated KIP.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > >
> > > > > On Sun, Jun 28, 2020, at 12:17, Sagar wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > I took some time out and as we discussed, looked to implement
> these
> > > > > > changes. Most of these cha

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-06 Thread Sagar
Hi John,

Thanks, I have updated the KIP.

Thanks!
Sagar.

On Mon, Jul 6, 2020 at 12:00 AM John Roesler  wrote:

> Hi Sagar,
>
> Sorry for the ambiguity. You could just mention it in the Public
> Interfaces section. Or, if you want to be more specific, you can show it in
> the method definition snippet. I don’t think it matters, as long as it’s
> clearly stated, since it affects backward compatibility with existing store
> implementations.
>
> Thanks,
> John
>
> On Sun, Jul 5, 2020, at 11:25, Sagar wrote:
> > Hi John,
> >
> > Thank you! Question on the comment, where should I add the default
> > implementation? I guess that needs to be added in the Proposal Section of
> > the kIP.
> >
> > Thanks!
> > Sagar.
> >
> > On Sat, Jul 4, 2020 at 11:46 PM John Roesler 
> wrote:
> >
> > > Thanks Sagar,
> > >
> > > That looks good to me! The only minor comment I’d make is that I think
> the
> > > method declaration should have a default implementation that throws an
> > > UnsupportedOperationException, for source compatibility with existing
> state
> > > stores.
> > >
> > > Otherwise, as far as I’m concerned, I’m ready to vote.
> > >
> > > Thanks,
> > > John
> > >
> > > On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> > > > Hi John,
> > > >
> > > > I have updated the KIP with all the new changes we discussed in this
> > > > discussion thread.
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > >
> > > > Request you to go through the same.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Tue, Jun 30, 2020 at 8:09 AM John Roesler 
> > > wrote:
> > > >
> > > > > Hi Sagar,
> > > > >
> > > > > That’s a good observation; yes, it should go in the
> > > ReadOnlyKeyValueStore
> > > > > interface.
> > > > >
> > > > > Thanks again for the great work,
> > > > > John
> > > > >
> > > > > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > Thank you for the positive feedback! The meaningful discussions
> we
> > > had on
> > > > > > the mailing list helped me understand what needed to be done.
> > > > > >
> > > > > > I am definitely open to any further suggestions on this.
> > > > > >
> > > > > > Before I updated the KIP, I just had one question, is it fine to
> > > have it
> > > > > > for KeyValueStore or should I move it to ReadOnlyKeyValueStore
> where
> > > even
> > > > > > the range query resides?
> > > > > >
> > > > > > Regarding the 2 notes on UnsupportedOperationException and
> changing
> > > the
> > > > > > name to prefixScan, i will incorporate both of them into the KIP.
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > > > On Sun, Jun 28, 2020 at 11:55 PM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Woah, this is great, Sagar!
> > > > > > >
> > > > > > > I think this API looks really good. I'm curious if anyone else
> has
> > > > > > > any concern. For my part, I think this will work just fine.
> People
> > > > > > > might face tricky bugs getting their key serde and their prefix
> > > > > > > serde "aligned", but I think the API makes it pretty obvious
> what
> > > > > > > has to happen to make this work. As long as the API isn't going
> > > > > > > to "trick" anyone by trying to abstract away things that can't
> be
> > > > > > > abstracted, this is the best we can do. In other words, I think
> > > > > > > your approach is ideal here.
> > > > > > >
> > > > > > > I also really appreciate that you took the time to do a full
> POC
> > > > > > > with end-to-end tests to show that the proposal is actually
> > > > > > > going to work.
> > > > 

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-11 Thread Sagar
Thanks John,

Sorry I’m new to this process. 😅 does it mean I start a voting email?

Pardon my ignorance.

Sagar.


On Sat, 11 Jul 2020 at 8:06 PM, John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the update. As far as I’m concerned, I’m ready to vote now.
>
> Thanks,
> John
>
> On Mon, Jul 6, 2020, at 12:58, Sagar wrote:
> > Hi John,
> >
> > Thanks, I have updated the KIP.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Jul 6, 2020 at 12:00 AM John Roesler  wrote:
> >
> > > Hi Sagar,
> > >
> > > Sorry for the ambiguity. You could just mention it in the Public
> > > Interfaces section. Or, if you want to be more specific, you can show
> it in
> > > the method definition snippet. I don’t think it matters, as long as
> it’s
> > > clearly stated, since it affects backward compatibility with existing
> store
> > > implementations.
> > >
> > > Thanks,
> > > John
> > >
> > > On Sun, Jul 5, 2020, at 11:25, Sagar wrote:
> > > > Hi John,
> > > >
> > > > Thank you! Question on the comment, where should I add the default
> > > > implementation? I guess that needs to be added in the Proposal
> Section of
> > > > the kIP.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Sat, Jul 4, 2020 at 11:46 PM John Roesler 
> > > wrote:
> > > >
> > > > > Thanks Sagar,
> > > > >
> > > > > That looks good to me! The only minor comment I’d make is that I
> think
> > > the
> > > > > method declaration should have a default implementation that
> throws an
> > > > > UnsupportedOperationException, for source compatibility with
> existing
> > > state
> > > > > stores.
> > > > >
> > > > > Otherwise, as far as I’m concerned, I’m ready to vote.
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > I have updated the KIP with all the new changes we discussed in
> this
> > > > > > discussion thread.
> > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > >
> > > > > > Request you to go through the same.
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > > > On Tue, Jun 30, 2020 at 8:09 AM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Sagar,
> > > > > > >
> > > > > > > That’s a good observation; yes, it should go in the
> > > > > ReadOnlyKeyValueStore
> > > > > > > interface.
> > > > > > >
> > > > > > > Thanks again for the great work,
> > > > > > > John
> > > > > > >
> > > > > > > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > > > > > > Hi John,
> > > > > > > >
> > > > > > > > Thank you for the positive feedback! The meaningful
> discussions
> > > we
> > > > > had on
> > > > > > > > the mailing list helped me understand what needed to be done.
> > > > > > > >
> > > > > > > > I am definitely open to any further suggestions on this.
> > > > > > > >
> > > > > > > > Before I updated the KIP, I just had one question, is it
> fine to
> > > > > have it
> > > > > > > > for KeyValueStore or should I move it to
> ReadOnlyKeyValueStore
> > > where
> > > > > even
> > > > > > > > the range query resides?
> > > > > > > >
> > > > > > > > Regarding the 2 notes on UnsupportedOperationException and
> > > changing
> > > > > the
> > > > > > > > name to prefixScan, i will incorporate both of them into the
> KIP.
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Sagar.
> &

[VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-07-12 Thread Sagar
Hi All,

I would like to start a new voting thread for the below KIP to add prefix
scan support to state stores:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
614%3A+Add+Prefix+Scan+support+for+State+Stores
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores>

Thanks!
Sagar.


Re: [VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-07-19 Thread Sagar
Hi All,

Bumping this thread to see if there are any feedbacks.

Thanks!
Sagar.

On Tue, Jul 14, 2020 at 9:49 AM John Roesler  wrote:

> Thanks for the KIP, Sagar!
>
> I’m +1 (binding)
>
> -John
>
> On Sun, Jul 12, 2020, at 02:05, Sagar wrote:
> > Hi All,
> >
> > I would like to start a new voting thread for the below KIP to add prefix
> > scan support to state stores:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 614%3A+Add+Prefix+Scan+support+for+State+Stores
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> >
> >
> > Thanks!
> > Sagar.
> >
>


Re: [VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-08-02 Thread Sagar
Hi All,

Just thought of bumping this voting thread again to see if we can form any
consensus around this.

Thanks!
Sagar.


On Mon, Jul 20, 2020 at 4:21 AM Adam Bellemare 
wrote:

> LGTM
> +1 non-binding
>
> On Sun, Jul 19, 2020 at 4:13 AM Sagar  wrote:
>
> > Hi All,
> >
> > Bumping this thread to see if there are any feedbacks.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 14, 2020 at 9:49 AM John Roesler 
> wrote:
> >
> > > Thanks for the KIP, Sagar!
> > >
> > > I’m +1 (binding)
> > >
> > > -John
> > >
> > > On Sun, Jul 12, 2020, at 02:05, Sagar wrote:
> > > > Hi All,
> > > >
> > > > I would like to start a new voting thread for the below KIP to add
> > prefix
> > > > scan support to state stores:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > >
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > >
> >
>


Re: [VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-09-01 Thread Sagar
Hi All,

Bumping the thread again !

Thanks!
Sagar.

On Wed, Aug 5, 2020 at 12:08 AM Sophie Blee-Goldman 
wrote:

> Thanks Sagar! +1 (non-binding)
>
> Sophie
>
> On Sun, Aug 2, 2020 at 11:37 PM Sagar  wrote:
>
> > Hi All,
> >
> > Just thought of bumping this voting thread again to see if we can form
> any
> > consensus around this.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Mon, Jul 20, 2020 at 4:21 AM Adam Bellemare  >
> > wrote:
> >
> > > LGTM
> > > +1 non-binding
> > >
> > > On Sun, Jul 19, 2020 at 4:13 AM Sagar 
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > Bumping this thread to see if there are any feedbacks.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Tue, Jul 14, 2020 at 9:49 AM John Roesler 
> > > wrote:
> > > >
> > > > > Thanks for the KIP, Sagar!
> > > > >
> > > > > I’m +1 (binding)
> > > > >
> > > > > -John
> > > > >
> > > > > On Sun, Jul 12, 2020, at 02:05, Sagar wrote:
> > > > > > Hi All,
> > > > > >
> > > > > > I would like to start a new voting thread for the below KIP to
> add
> > > > prefix
> > > > > > scan support to state stores:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > >
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-09-02 Thread Sagar
Thanks All!

I see 3 binding +1 votes and 2 non-binding +1s. Does it mean this KIP has
gained a lazy majority?

Thanks!
Sagar.

On Thu, Sep 3, 2020 at 6:51 AM Guozhang Wang  wrote:

> Thanks for the KIP Sagar. I'm +1 (binding) too.
>
>
> Guozhang
>
> On Tue, Sep 1, 2020 at 1:24 PM Bill Bejeck  wrote:
>
> > Thanks for the KIP! This is a great addition to the streams API.
> >
> > +1 (binding)
> >
> > -Bill
> >
> > On Tue, Sep 1, 2020 at 12:33 PM Sagar  wrote:
> >
> > > Hi All,
> > >
> > > Bumping the thread again !
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Wed, Aug 5, 2020 at 12:08 AM Sophie Blee-Goldman <
> sop...@confluent.io
> > >
> > > wrote:
> > >
> > > > Thanks Sagar! +1 (non-binding)
> > > >
> > > > Sophie
> > > >
> > > > On Sun, Aug 2, 2020 at 11:37 PM Sagar 
> > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Just thought of bumping this voting thread again to see if we can
> > form
> > > > any
> > > > > consensus around this.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > >
> > > > > On Mon, Jul 20, 2020 at 4:21 AM Adam Bellemare <
> > > adam.bellem...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > LGTM
> > > > > > +1 non-binding
> > > > > >
> > > > > > On Sun, Jul 19, 2020 at 4:13 AM Sagar  >
> > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Bumping this thread to see if there are any feedbacks.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > > > On Tue, Jul 14, 2020 at 9:49 AM John Roesler <
> > vvcep...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the KIP, Sagar!
> > > > > > > >
> > > > > > > > I’m +1 (binding)
> > > > > > > >
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Sun, Jul 12, 2020, at 02:05, Sagar wrote:
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I would like to start a new voting thread for the below KIP
> > to
> > > > add
> > > > > > > prefix
> > > > > > > > > scan support to state stores:
> > > > > > > > >
> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > 614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Sagar.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-09-03 Thread Sagar
Hi John,

Thank you! I have marked the KIP as Accepted :)

Regarding the point on InMemoryKeyValueStore, in the PR I had added the
implementation for InMemoryKeyValueStore as well. I hadn't mentioned about
it in the KIP which I have done now as you suggested.

Thanks!
Sagar.

On Thu, Sep 3, 2020 at 8:10 PM John Roesler  wrote:

> Hi Sagar,
>
> Yes! Congratulations :)
>
> Now, you can mark the status of the KIP as "Accepted" and we
> can move on to reviewing your PRs.
>
> One quick note: Matthias didn't have time to review the KIP
> in full, but he did point out to me that there's a lot of
> information about the RocksDB implementation and no mention
> of the InMemory store. We both agree that we should
> implement the new method also for the InMemory store.
> Assuming you agree, note that we don't need to discuss any
> implementation details, so you could just update the KIP
> document to also mention, "We will also implement the new
> method in the InMemoryKeyValueStore."
>
> Thanks for your contribution to Apache Kafka!
> -John
>
> On Thu, 2020-09-03 at 09:30 +0530, Sagar wrote:
> > Thanks All!
> >
> > I see 3 binding +1 votes and 2 non-binding +1s. Does it mean this KIP has
> > gained a lazy majority?
> >
> > Thanks!
> > Sagar.
> >
> > On Thu, Sep 3, 2020 at 6:51 AM Guozhang Wang  wrote:
> >
> > > Thanks for the KIP Sagar. I'm +1 (binding) too.
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Sep 1, 2020 at 1:24 PM Bill Bejeck  wrote:
> > >
> > > > Thanks for the KIP! This is a great addition to the streams API.
> > > >
> > > > +1 (binding)
> > > >
> > > > -Bill
> > > >
> > > > On Tue, Sep 1, 2020 at 12:33 PM Sagar 
> wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Bumping the thread again !
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Wed, Aug 5, 2020 at 12:08 AM Sophie Blee-Goldman <
> > > sop...@confluent.io
> > > > > wrote:
> > > > >
> > > > > > Thanks Sagar! +1 (non-binding)
> > > > > >
> > > > > > Sophie
> > > > > >
> > > > > > On Sun, Aug 2, 2020 at 11:37 PM Sagar  >
> > > > wrote:
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Just thought of bumping this voting thread again to see if we
> can
> > > > form
> > > > > > any
> > > > > > > consensus around this.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Jul 20, 2020 at 4:21 AM Adam Bellemare <
> > > > > adam.bellem...@gmail.com
> > > > > > > wrote:
> > > > > > >
> > > > > > > > LGTM
> > > > > > > > +1 non-binding
> > > > > > > >
> > > > > > > > On Sun, Jul 19, 2020 at 4:13 AM Sagar <
> sagarmeansoc...@gmail.com
> > > > > > wrote:
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > Bumping this thread to see if there are any feedbacks.
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Sagar.
> > > > > > > > >
> > > > > > > > > On Tue, Jul 14, 2020 at 9:49 AM John Roesler <
> > > > vvcep...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > Thanks for the KIP, Sagar!
> > > > > > > > > >
> > > > > > > > > > I’m +1 (binding)
> > > > > > > > > >
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Sun, Jul 12, 2020, at 02:05, Sagar wrote:
> > > > > > > > > > > Hi All,
> > > > > > > > > > >
> > > > > > > > > > > I would like to start a new voting thread for the
> below KIP
> > > > to
> > > > > > add
> > > > > > > > > prefix
> > > > > > > > > > > scan support to state stores:
> > > > > > > > > > >
> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > > > 614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > > > > > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > > > > > >
> > > > > > > > > > > Thanks!
> > > > > > > > > > > Sagar.
> > > > > > > > > > >
> > >
> > > --
> > > -- Guozhang
> > >
>
>


Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-25 Thread Sagar
Hi Jack,

Thanks I have a couple of final comments and then I am good.

1) Can you elaborate on the Javadocs of the partition headers argument to
specify that a null headers value is equivalent to invoking the older
partition method? It is apparent but generally good to call out.
2) In the Compatibility section, you have mentioned backward comparable. I
believe it should be *backward compatible change.*

I don't have other comments. Post this, probably someone else who has more
context on Clients can also chime in on this before we can move this to
Voting.

Thanks!
Sagar.


On Sat, Jul 22, 2023 at 10:09 AM Jack Tomy  wrote:

> Hey @Sagar,
>
> Thank you again for the response and feedback.
>
>1. Though the ask wasn't very clear to me I have attached the Javadoc as
>per your suggestion. Please have a look and let me know if this meets
> the
>expectations.
>2. Done.
>3. Done
>4. Done
>
> Hey @Sagar and everyone,
> Please have a look at the new version and share your thoughts.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
>
> On Thu, Jul 20, 2023 at 9:46 PM Sagar  wrote:
>
> > Thanks Jack for the updates.
> >
> > Some more feedback:
> >
> > 1) It would be better if you can add the Javadoc in the Public interfaces
> > section. That is a general practice used which gives the readers of the
> KIP
> > a high level idea of the Public Interfaces.
> >
> > 2) In the proposed section, the bit about marking headers as read only
> > seems like an implementation detail This can generally be avoided in
> KIPs.
> >
> > 3) Also, in the Deprecation section, can you mention again that this is a
> > backward compatible change and the reason for it (already done in the
> > Proposed Changes section).
> >
> > 4) In the Testing Plan section, there is still the KIP template bit
> copied
> > over. That can be removed.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Jul 20, 2023 at 2:48 PM Jack Tomy  wrote:
> >
> > > Hey Everyone,
> > >
> > > Please consider this as a reminder and share your feedback. Thank you.
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > >
> > > On Tue, Jul 18, 2023 at 5:43 PM Jack Tomy 
> wrote:
> > >
> > > > Hey @Sagar,
> > > >
> > > > Thank you for the response and feedback.
> > > >
> > > >1. Done
> > > >2. Yeah, that was a mistake from my end. Corrected.
> > > >3. Can you please elaborate this, I have added the java doc along
> > with
> > > >the code changes. Should I paste the same in KIP too?
> > > >4. Moved.
> > > >5. I have added one more use case, it is actually helpful in any
> > > >situation where you want to pass some information to partition
> > method
> > > but
> > > >don't have to have it in the key or value.
> > > >6. Added.
> > > >
> > > >
> > > > Hey @Sagar and everyone,
> > > > Please have a look at the new version and share your thoughts.
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > > >
> > > >
> > > > On Tue, Jul 18, 2023 at 9:53 AM Sagar 
> > wrote:
> > > >
> > > >> Hi Jack,
> > > >>
> > > >> Thanks for the KIP! Seems like an interesting idea. I have some
> > > feedback:
> > > >>
> > > >> 1) It would be great if you could clean up the text that seems to
> > mimic
> > > >> the
> > > >> KIP template. It is generally not required in the KIP.
> > > >>
> > > >> 2) In the Public Interfaces where you mentioned *Partitioner method
> in
> > > >> **org/apache/kafka/clients/producer
> > > >> will have the following update*, I believe you meant the Partitioner
> > > >> *interface*?
> > > >>
> > > >> 3) Staying on Public Interface, it is generally preferable to add a
> > > >> Javadocs section along with the newly added method. You could also
> > > >> describe
> > > >> the behaviour of it invoking the default existing method.
> > > >>
> > > >> 4) The option that is mentioned in the Rejected Alternatives, seems
> > more
> > > >> like a workaround to the current problem that you are describing.
> That
>

Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-27 Thread Sagar
Hey Andrew,

Thanks for the review. Since I had reviewed the KIP I thought I would also
respond. Of course Jack has the final say on this since he wrote the KIP.

1) This is an interesting point and I hadn't considered it. The
comparison with KIP-848 is a valid one but even within that KIP, it allows
client side partitioning for power users like Streams. So while we would
want to move away from client side partitioner as much as possible, we
still shouldn't do away completely with Client side partitioning and end up
being in a state of inflexibility for different kinds of usecases. This is
my opinion though and you have more context on Clients, so would like to
know your thoughts on this.

2) Regarding this, I assumed that since the headers are already part of the
consumer records they should have access to the headers and if there is a
contract b/w the applications producing and the application consuming, that
decisioning should be transparent. Was my assumption incorrect? But as you
rightly pointed out header based partitioning with keys is going to lead to
surprising results. Assuming there is merit in this proposal, do you think
we should ignore the keys in this case (similar to the effect of
setting *partitioner.ignore.keys
*config to false) and document it appropriately?

Let me know what you think.

Thanks!
Sagar.


On Thu, Jul 27, 2023 at 9:41 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Jack,
> Thanks for the KIP. I have a few concerns about the idea.
>
> 1) I think that while a client-side partitioner seems like a neat idea and
> it’s an established part of Kafka,
> it’s one of the things which makes Kafka clients quite complicated. Just
> as KIP-848 is moving from
> client-side assignors to server-side assignors, I wonder whether really we
> should be looking to make
> partitioning a server-side capability too over time. So, I’m not convinced
> that making the Partitioner
> interface richer is moving in the right direction.
>
> 2) For records with a key, the partitioner usually calculates the
> partition from the key. This means
> that records with the same key end up on the same partition. Many
> applications expect this to give ordering.
> Log compaction expects this. There are situations in which records have to
> be repartitioned, such as
> sometimes happens with Kafka Streams. I think that a header-based
> partitioner for records which have
> keys is going to be surprising and only going to have limited
> applicability as a result.
>
> The tricky part about clever partitioning is that downstream systems have
> no idea how the partition
> number was arrived at, so they do not truly understand how the ordering
> was derived. I do think that
> perhaps there’s value to being able to influence the partitioning using
> the headers, but I wonder if actually
> transforming the headers into an “ordering context” that then flows with
> the record as it moves through
> the system would be a stronger solution. Today, the key is the ordering
> context. Maybe it should be a
> concept in its own right and the Producer could configure a converter from
> headers to ordering context.
> That is of course a much bigger change.
>
> In one of the examples you mention in the KIP, you mentioned using a
> header to control priority. I guess the
> idea is to preferentially process records off specific partitions so they
> can overtake lower priority records.
> I suggest just sending the records explicitly to specific partitions to
> achieve this.
>
> Sorry for the essay, but you did ask for people to share their thoughts :)
>
> Just my opinion. Let’s see what others think.
>
> Thanks,
> Andrew
>
> > On 25 Jul 2023, at 14:58, Jack Tomy  wrote:
> >
> > Hey @Sagar
> >
> > Thanks again for the review.
> > 1. "a null headers value is equivalent to invoking the older partition
> > method", this is not true. If someone makes an implementation and the
> > headers come as null, still the new implementation will take effect.
> > Instead I have added : "Not overriding this method in the Partitioner
> > interface has the same behaviour as using the existing method."
> > 2. Corrected.
> >
> > Hey @Sagar and everyone,
> > Please have a look at the new version and share your thoughts.
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > Like Sagar mentioned, I would also request more people who have more
> > context on clients to chime in.
> >
> >
> > On Tue, Jul 25, 2023 at 2:58 PM Sagar  wrote:
> >
> >> Hi Jack,
> >>
> >> Thanks I have a couple of final comments and then I am good.
> >>
> >> 1) Can you elaborate 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-28 Thread Sagar
Hey Yash,

Thanks for your comments.

1) Hmm the question is how do you qualify a partition as stale or old?
Let's say a connector has implemented updateOffsets and for a certain
partition for which no records are received then it will update it's
offsets. So technically that offset can't be termed as stale anymore. Even
though I can't think of a side effect at this point to disallow offset
deletion via this method, my opinion is to use a proper mechanism like the
ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
the option presented in point #2 , for simplicity sake it seems better to
not add this feature at this point. If we feel it's really needed and users
are requesting it, we can add support for it later on.

2) I get the point now. I can't think of cases where updating offsets would
be needed. As with point #1, we can always add it back if needed later on.
For now, I have removed that part from the KIP.

3) Yes, because the offset commit happens on a different thread, ordering
guarantees might be harder to ensure if we do it from the other thread. The
current mechanism proposed, even though gets invoked multiple times, keeps
things simpler to reason about.

Let me know how things look now. If it's all looking ok, I would go ahead
and create a Vote thread for the same.

Thanks!
Sagar.

On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the updates. I had a few more follow up questions:
>
> > I have added that a better way of doing that would be
> > via KIP-875. Also, I didn't want to include any mechamisms
> > for users to meddle with the offsets topic. Allowing tombstone
> > records via this method would be akin to publishing tombstone
> > records directly to the offsets topic which is not recommended
> > generally.
>
> KIP-875 would allow a way for cluster administrators and / or users to do
> so manually externally whereas allowing tombstones in
> SourceTask::updateOffsets would enable connectors to clean up offsets for
> old / stale partitions without user intervention right? I'm not sure I
> follow what you mean by "I didn't want to include any mechamisms for users
> to meddle with the offsets topic" here? Furthermore, I'm not sure why
> publishing tombstone records directly to the offsets topic would not be
> recommended? Isn't that currently the only way to manually clean up offsets
> for a source connector?
>
> > It could be useful in a scenario where the offset of a partition
> > doesn't update for some period of time. In such cases, the
> > connector can do some kind of state tracking and update the
> > offsets after the time period elapses.
>
> I'm not sure I follow? In this case, won't the offsets argument passed
> to SourceTask::updateOffsets *not *contain the source partition which
> hasn't had an update for a long period of time? Wouldn't it make more sense
> to reduce the surface of the API as Chris suggested and only allow adding
> new partition offset pairs to the about to be committed offsets (since
> there don't seem to be any use cases outlined for allowing connectors to
> update offsets for source partitions that are already about to have an
> offset be committed for)?
>
> > All the records returned by the previous poll invocation
> >  got processed successfully
>
> Thanks for this clarification in the KIP, it looks like it does address the
> offsets ordering issue. As to Chris' point about invoking
> SourceTask::updateOffsets less frequently by calling it before offsets are
> committed rather than in every poll loop iteration - I guess that would
> make it a lot more tricky to address the ordering issue?
>
>
> Thanks,
> Yash
>
> On Thu, Jul 20, 2023 at 9:50 PM Sagar  wrote:
>
> > Hey All,
> >
> > Please let me know how the KIP looks now. Is it at a stage where I can
> > start with the Voting phase? Of course I am still open to
> > feedback/suggestions but planning to start the Vote for it.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 11, 2023 at 10:00 PM Sagar 
> wrote:
> >
> > > Hi Yash/Chris,
> > >
> > > Thanks for the feedback! I have updated the KIP with the suggestions
> > > provided. I would also update the PR with the suggestions.
> > >
> > > Also, I was hoping that this could make it to the 3.6 release given
> that
> > > it would benefit source connectors which have some of the problems
> listed
> > > in the Motivation Section.
> > >
> > > Responses Inline:
> > >
> > > Yash:
> > >
> > > 1) In the proposed changes section where 

Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-29 Thread Sagar
Hi Andrew,

Thanks for your comments.

1) Yes that makes sense and that's what even would expect to see as well. I
just wanted to highlight that we might still need a way to let client side
partitioning logic be present as well. Anyways, I am good on this point.
2) The example provided does seem achievable by simply attaching the
partition number in the ProducerRecord. I guess if we can't find any
further examples which strengthen the case of this partitioner, it might be
harder to justify adding it.


Thanks!
Sagar.

On Fri, Jul 28, 2023 at 2:05 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Sagar,
> Thanks for your comments.
>
> 1) Server-side partitioning doesn’t necessarily mean that there’s only one
> way to do it. It just means that the partitioning logic runs on the broker
> and
> any configuration of partitioning applies to the broker’s partitioner. If
> we ever
> see a KIP for this, that’s the kind of thing I would expect to see.
>
> 2) In the priority example in the KIP, there is a kind of contract between
> the
> producers and consumers so that some records can be processed before
> others regardless of the order in which they were sent. The producer
> wants to apply special significance to a particular header to control which
> partition is used. I would simply achieve this by setting the partition
> number
> in the ProducerRecord at the time of sending.
>
> I don’t think the KIP proposes adjusting the built-in partitioner or
> adding to AK
> a new one that uses headers in the partitioning decision. So, any
> configuration
> for a partitioner that does support headers would be up to the
> implementation
> of that specific partitioner. Partitioner implements Configurable.
>
> I’m just providing an alternative view and I’m not particularly opposed to
> the KIP.
> I just don’t think it quite merits the work involved to get it voted and
> merged.
> As an aside, a long time ago, I created a small KIP that was never adopted
> and I didn’t push it because I eventually didn’t need it.
>
> Thanks,
> Andrew
>
> > On 28 Jul 2023, at 05:15, Sagar  wrote:
> >
> > Hey Andrew,
> >
> > Thanks for the review. Since I had reviewed the KIP I thought I would
> also
> > respond. Of course Jack has the final say on this since he wrote the KIP.
> >
> > 1) This is an interesting point and I hadn't considered it. The
> > comparison with KIP-848 is a valid one but even within that KIP, it
> allows
> > client side partitioning for power users like Streams. So while we would
> > want to move away from client side partitioner as much as possible, we
> > still shouldn't do away completely with Client side partitioning and end
> up
> > being in a state of inflexibility for different kinds of usecases. This
> is
> > my opinion though and you have more context on Clients, so would like to
> > know your thoughts on this.
> >
> > 2) Regarding this, I assumed that since the headers are already part of
> the
> > consumer records they should have access to the headers and if there is a
> > contract b/w the applications producing and the application consuming,
> that
> > decisioning should be transparent. Was my assumption incorrect? But as
> you
> > rightly pointed out header based partitioning with keys is going to lead
> to
> > surprising results. Assuming there is merit in this proposal, do you
> think
> > we should ignore the keys in this case (similar to the effect of
> > setting *partitioner.ignore.keys
> > *config to false) and document it appropriately?
> >
> > Let me know what you think.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Jul 27, 2023 at 9:41 PM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi Jack,
> >> Thanks for the KIP. I have a few concerns about the idea.
> >>
> >> 1) I think that while a client-side partitioner seems like a neat idea
> and
> >> it’s an established part of Kafka,
> >> it’s one of the things which makes Kafka clients quite complicated. Just
> >> as KIP-848 is moving from
> >> client-side assignors to server-side assignors, I wonder whether really
> we
> >> should be looking to make
> >> partitioning a server-side capability too over time. So, I’m not
> convinced
> >> that making the Partitioner
> >> interface richer is moving in the right direction.
> >>
> >> 2) For records with a key, the partitioner usually calculates the
> >> partition from the key. This means
> >> that records with the same key end up on the sa

[VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-02 Thread Sagar
Hi All,

Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
design. Ofcourse I am open to any feedback/suggestions and would address
them.

Thanks!
Sagar.


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-02 Thread Sagar
Attaching the KIP link for reference:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records

Thanks!
Sagar.

On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:

> Hi All,
>
> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
> design. Ofcourse I am open to any feedback/suggestions and would address
> them.
>
> Thanks!
> Sagar.
>


Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-11 Thread Sagar
Hey jack ,

+1 (non binding)

Sagar.

On Sat, 12 Aug 2023 at 8:04 AM, Jack Tomy  wrote:

> Hey everyone,
>
> Please consider this as a gentle reminder.
>
> On Mon, Aug 7, 2023 at 5:55 PM Jack Tomy  wrote:
>
> > Hey everyone.
> >
> > I would like to call for a vote on KIP-953: partition method to be
> > overloaded to accept headers as well.
> >
> > KIP :
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > Discussion thread :
> > https://lists.apache.org/thread/0f20kvfqkmhdqrwcb8vqgqn80szcrcdd
> >
> > Thanks
> > --
> > Best Regards
> > *Jack*
> >
>
>
> --
> Best Regards
> *Jack*
>


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-17 Thread Sagar
Hi All,

Bumping the voting thread again.

Thanks!
Sagar.

On Wed, Aug 2, 2023 at 4:43 PM Sagar  wrote:

> Attaching the KIP link for reference:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>
> Thanks!
> Sagar.
>
> On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:
>
>> Hi All,
>>
>> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
>> design. Ofcourse I am open to any feedback/suggestions and would address
>> them.
>>
>> Thanks!
>> Sagar.
>>
>


Re: Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

2023-08-17 Thread Sagar
Hey Hudeqi,

I took some time to read through the PR link as well where you and Chris
had an informative discussion.

I think even over there and in this discussion thread, it seems to me that
the consensus is to reduce the scope of the KIP to reduce the default value
of segment.bytes config for offsets topic. This will prevent future workers
from having a lesser boot up time. IMO while this might not seem like a
high impact thing, the configs that we are talking about here are advanced
ones which new users for Connect might not immediately look into. So, if
they end up in a situation where there's a 23-min worker startup time, then
it might not be an overall good experience for them.

Regarding the point Greg mentioned, we will have to think about getting
around it. The approach you suggested seems unclean to me. Since you have
been testing with this config in your cluster and you already have a large
offsets topic, in your experience have you noticed any discrepancies of the
in-memory states across workers in your cluster? Would it be possible for
you to test that? That might be a good starting point to understand how we
want to fix this. Ideally we should have some kind of a Point of view(or
even a potential fix) on this before we go about implementing this change.
WDYT?

Thanks!
Sagar.

On Mon, Aug 14, 2023 at 6:09 PM hudeqi <16120...@bjtu.edu.cn> wrote:

> bump this discuss thread.
>
> best,
> hudeqi
>
> "hudeqi" <16120...@bjtu.edu.cn>写道:
> > Sorry for not getting email reminders and ignoring your reply for
> getting back so late, Yash Mayya, Greg Harris, Sagar.
> >
> > Thank you for your thoughts and suggestions, I learned a lot, I will
> give my thoughts and answers in a comprehensive way:
> > 1. The default configuration of 50MB is the online configuration I
> actually used to solve this problem, and the effect is better (see the
> description of jira:
> https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15086?filter=allopenissues.
> In fact, I think it may be better to set this value smaller, so I abandoned
> the default value like __consumer_offsets, but I don't know how much the
> default value is the best.). Secondly, I also set the default value of 50MB
> online through ConfigDef#defineInternal, and if the value configured by the
> user is greater than the default value, the warning log will be displayed,
> but the only difference from your said is that I will overwrite the value
> configured by the user with the default value (emmm, this point was denied
> by Chris Egerton: https://github.com/apache/kafka/pull/13852, in fact,
> you all agree that should not directly override the user-configured value,
> and now I agree with this).
> > 2. I think the potential bug that Greg mentioned may lead to
> inconsistent state between workers is a great point. It is true that we
> cannot directly change the configuration for an existing internal topics.
> Perhaps a more tricky and disgusting approach is that we manually find that
> the active segment sizes of all current partitions are relatively small,
> first stop all connect instances, then change the topic configuration, and
> finally start the instances.
> >
> > To sum up, I think whether the scope of the KIP could be reduced to:
> only set the default value of the 'segment.bytes' of the internal topics
> and make a warning for the bigger value configured by the user. What do you
> think? If there's a better way I'm all ears.
> >
> > best,
> > hudeqi
>


Re: [DISCUSS] KIP-970: Deprecate and remove Connect's redundant task configurations endpoint

2023-08-23 Thread Sagar
Thanks Yash. LGTM

Thanks!
Sagar.

On Tue, Aug 22, 2023 at 6:04 PM Chris Egerton 
wrote:

> Hi Yash,
>
> Thanks for driving this, and for putting out a well-written KIP. LGTM!
>
> Cheers,
>
> Chris
>
> On Tue, Aug 22, 2023 at 6:13 AM Yash Mayya  wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion thread for this KIP -
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint
> > .
> >
> > It proposes the deprecation and eventual removal of Kafka Connect's
> > redundant task configurations endpoint.
> >
> > Thanks,
> > Yash
> >
>


Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-27 Thread Sagar
Hi Ismael,

Thanks for pointing us towards the direction of a DTO based approach. The
AdminClient examples seem very neat and extensible in that sense.
Personally, I was trying to think only along the lines of how the current
Partitioner interface has been designed, i.e having all requisite
parameters as separate arguments (Topic, Key, Value etc).

Regarding this question of yours:

A more concrete question: did we consider having the method `partition`
> take `ProduceRecord` as one of its parameters and `Cluster` as the other?


No, I don't think in the discussion thread it was brought up and as I said
it appears that could be due to an attempt to keep the new method's
signature similar to the existing one within Partitioner. If I understood
the intent of the question correctly, are you trying to hint here that
`ProducerRecord` already contains all the arguments that the `partition`
method accepts and also has a `headers` field within it. So, instead of
adding another method for the `headers` field, why not create a new method
taking ProducerRecord directly?

If my understanding is correct, then it seems like a very clean way of
adding support for `headers`. Anyways, the partition method within
KafkaProducer already takes a ProducerRecord as an argument so that makes
it easier. Keeping that in mind, should this new method's (which takes a
ProducerRecord as an input) default implementation invoke the existing
method ? One challenge I see there is that the existing partition method
expects serialized keys and values while ProducerRecord doesn't have access
to those (It directly operates on K, V).

Thanks!
Sagar.


On Sun, Aug 27, 2023 at 8:51 AM Ismael Juma  wrote:

> A more concrete question: did we consider having the method `partition`
> take `ProduceRecord` as one of its parameters and `Cluster` as the other?
>
> Ismael
>
> On Sat, Aug 26, 2023 at 12:50 PM Greg Harris  >
> wrote:
>
> > Hey Ismael,
> >
> > > The mention of "runtime" is specific to Connect. When it comes to
> > clients,
> > one typically compiles and runs with the same version or runs with a
> newer
> > version than the one used for compilation. This is standard practice in
> > Java and not something specific to Kafka.
> >
> > When I said "older runtimes" I was being lazy, and should have said
> > "older versions of clients at runtime," thank you for figuring out
> > what I meant.
> >
> > I don't know how common it is to compile a partitioner against one
> > version of clients, and then distribute and run the partitioner with
> > older versions of clients and expect graceful degradation of features.
> > If you say that it is very uncommon and not something that we should
> > optimize for, then I won't suggest otherwise.
> >
> > > With regards to the Admin APIs, they have been extended several times
> > since introduction (naturally). One of them is:
> > >
> >
> https://github.com/apache/kafka/commit/1d22b0d70686aef5689b775ea2ea7610a37f3e8c
> >
> > Thanks for the example. I also see that includes a migration from
> > regular arguments to the DTO style, consistent with your
> > recommendation here.
> >
> > I think the DTO style and the proposed additional argument style are
> > both reasonable.
> >
> > Thanks,
> > Greg
> >
> > On Sat, Aug 26, 2023 at 9:46 AM Ismael Juma  wrote:
> > >
> > > Hi Greg,
> > >
> > > The mention of "runtime" is specific to Connect. When it comes to
> > clients,
> > > one typically compiles and runs with the same version or runs with a
> > newer
> > > version than the one used for compilation. This is standard practice in
> > > Java and not something specific to Kafka.
> > >
> > > With regards to the Admin APIs, they have been extended several times
> > since
> > > introduction (naturally). One of them is:
> > >
> > >
> >
> https://github.com/apache/kafka/commit/1d22b0d70686aef5689b775ea2ea7610a37f3e8c
> > >
> > > Ismael
> > >
> > > On Sat, Aug 26, 2023 at 8:29 AM Greg Harris
>  > >
> > > wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > Thank you for clarifying where the DTO pattern is used already, I did
> > > > not have the admin methods in mind.
> > > >
> > > > > With the DTO approach, you don't create a new DTO, you simply add a
> > new
> > > > overloaded constructor and accessor to the DTO.
> > > >
> > > > With this variant, partitioner implementations would receive a
> > >

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-28 Thread Sagar
Hey Yash,

Thanks for your further comments. Here are my responses:

1) Deleting offsets via updateOffsets.

Hmm, I am not sure this is really necessary to be part of the KIP at this
point, and we can always add it later on if needed. I say this for the
following reasons:


   - The size of offsets topic can be controlled by setting appropriate
   topic retention values and that is a standard practice in Kafka. Sure it's
   not always possible to get the right values but as I said it is a standard
   practice. For Connect specifically, there is also a KIP (KIP-943
   <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073470>)
   which is trying to solve the problem of a large connect-offsets topic. So,
   if that is really the motivation, then these are being addressed separately
   anyways.
   - Deleting offsets is not something which should be done very frequently
   and should be handled with care. That is why KIP-875's mechanism to have
   users/ cluster admin do this externally is the right thing to do. Agreed
   this involves some toil but it's not something that should be done on a
   very regular basis.
   - There is no stopping connector implementations to send tombstone
   records as offsets but in practice how many connectors actually do it?
   Maybe 1 or 2 from what we discussed.
   - The usecases you highlighted are edge cases at best. As I have been
   saying, if it is needed we can always add it in the future but that doesn't
   look like a problem we need to solve upfront.

Due to these reasons, I don't think this is a point that we need to stress
so much upon. I say this because offsets topic's purging/clean up can be
handled either via standard Kafka techniques (point #1 above) or via
Connect runtime techniques (Pt #2  above). IMO the problem we are trying to
solve via this KIP has been solved by connectors using techniques which
have been termed as having higher maintenance cost or a high cognitive load
(i.e separate topic) and that needs to be addressed upfront. And since you
yourself termed it as a nice to have feature, we can leave it to that and
take it up as Future Work. Hope that's ok with you and other community
members.

2) Purpose of offsets parameter in updateOffsets

The main purpose is to provide the task with the visibility into what
partitions are getting their offsets committed. It is not necessary that a
task might choose to update offsets everytime it sees that a given source
partition is missing from the about to be committed offsets. Maybe it
chooses to wait for some X iterations or X amount of time and send out an
updated offset for a partition only when such thresholds are breached. Even
here we could argue that since it's sending the partition/offsets it can do
the tracking on it's own, but IMO that is too much work given that the
information is already available via offsets to be committed.

Thanks!
Sagar.


Disabling Test: org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()

2023-08-28 Thread Sagar
Hi All,

Should we disable this test:
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()?

I just did a quick search on my mailbox for this test and it has been
failing for a while. I will go ahead and create a ticket for this for
fixing this.

Let me know if disabling it doesn't sound like a good idea.

Thanks!
Sagar.


Re: Disabling Test: org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()

2023-08-28 Thread Sagar
Hey Greg,

Aah ok, I wasn't aware there existed a JIRA for this already. I did see
your attempt to fix this but it seems to be failing still.

Sagar.

On Mon, Aug 28, 2023 at 10:30 PM Greg Harris 
wrote:

> Hey Sagar,
>
> The JIRA for this flaky test is here:
> https://issues.apache.org/jira/browse/KAFKA-8115
>
> Rather than disabling the test, I think we should look into the cause
> of the flakiness.
>
> Thanks!
> Greg
>
> On Mon, Aug 28, 2023 at 2:49 AM Sagar  wrote:
> >
> > Hi All,
> >
> > Should we disable this test:
> >
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()?
> >
> > I just did a quick search on my mailbox for this test and it has been
> > failing for a while. I will go ahead and create a ticket for this for
> > fixing this.
> >
> > Let me know if disabling it doesn't sound like a good idea.
> >
> > Thanks!
> > Sagar.
>


Re: [VOTE] KIP-970: Deprecate and remove Connect's redundant task configurations endpoint

2023-08-30 Thread Sagar
+1 (non - binding).

Thanks !
Sagar.

On Wed, 30 Aug 2023 at 11:09 PM, Chris Egerton 
wrote:

> +1 (binding), thanks Yash!
>
> On Wed, Aug 30, 2023 at 1:34 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
> > Thanks for the KIP. Looks good to me.
> >
> > +1 (non-binding).
> >
> > Andrew
> >
> > > On 30 Aug 2023, at 18:07, Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> > hgerald...@bloomberg.net> wrote:
> > >
> > > This makes sense to me, +1 (non-binding)
> > >
> > > From: dev@kafka.apache.org At: 08/30/23 02:58:59 UTC-4:00To:
> > dev@kafka.apache.org
> > > Subject: [VOTE] KIP-970: Deprecate and remove Connect's redundant task
> > configurations endpoint
> > >
> > > Hi all,
> > >
> > > This is the vote thread for KIP-970 which proposes deprecating (in the
> > > Apache Kafka 3.7 release) and eventually removing (in the next major
> > Apache
> > > Kafka release - 4.0) Connect's redundant task configurations endpoint.
> > >
> > > KIP -
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remov
> > > e+Connect%27s+redundant+task+configurations+endpoint
> > >
> > > Discussion thread -
> > > https://lists.apache.org/thread/997qg9oz58kho3c19mdrjodv0n98plvj
> > >
> > > Thanks,
> > > Yash
> > >
> > >
> >
> >
>


Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-05 Thread Sagar
Hey Chris,

Thanks for the KIP. Seems like a useful feature. I have some more
questions/comments:

1) Considering that you have now clarified here that last_modified field
would be stored in-memory, it is not mentioned in the KIP. Although that's
something that's understandable, it wasn't apparent when reading the KIP.
Probably, we could mention it? Also, what happens if a worker restarts? In
that case, since the log level update message would be pre-dating the
startup of the worker, it would be ignored? We should probably mention that
behaviour as well IMO.

2) Staying on the last_modified field, it's utility is also not too clear
to me. Can you add some examples of how it can be useful for debugging etc?

3) In the test plan, should we also add a test when the scope parameter
passed is non-null and neither worker nor cluster? In this case the
behaviour should be similar to the default case.

4) I had the same question as Yash regarding persistent cluster-wide
logging level. I think you have explained it well and we can skip it for
now.

Thanks!
Sagar.

On Tue, Sep 5, 2023 at 8:49 PM Chris Egerton 
wrote:

> Hi all,
>
> Thank you so much for the generous review comments! Happy to see interest
> in this feature. Inline responses follow.
>
>
> Ashwin:
>
> > Don't you foresee a future scope type which may require cluster metadata
> ?
> In that case, isn't it better to forward the requests to the leader in the
> initial implementation ?
>
> I agree with Yash here: we can cross that bridge when we come to it, unless
> there are problems that we'd encounter then that would be better addressed
> by adding request forwarding now. One potential argument I can think of is
> that the UX would be a little strange if the semantics for this endpoint
> differ depending on the value of the scope parameter (some values would be
> affected by in-progress rebalances, and some would not), but I don't know
> if making scope=cluster more brittle in the name of consistency with, e.g.,
> scope=connectorType:foo is really a worthwhile tradeoff. Thoughts?
>
> > I would also recommend an additional system test for Standalone herder to
> ensure that the new scope parameter is honored and the response contains
> the last modified time.
>
> Ah, great call! I love the new testing plan section. I also share Yash's
> concerns about adding a new system test though (at this point, they're so
> painful to write, test, and debug that in most circumstances I consider
> them a last resort). Do you think it'd be reasonable to add end-to-end
> verification for this with an integration test instead?
>
>
> Yash:
>
> > From the proposed changes section, it isn't very clear to me how we'll be
> tracking this last modified timestamp to be returned in responses for the
> *GET
> /admin/loggers* and *GET /admin/loggers/{logger}* endpoints. Could you
> please elaborate on this? Also, will we track the last modified timestamp
> even for worker scoped modifications where we won't write any records to
> the config topic and the requests will essentially be processed
> synchronously?
>
> RE timestamp tracking: I was thinking we'd store the timestamp for each
> level in-memory and, whenever we change the level for a namespace, update
> its timestamp to the current wallclock time. Concretely, this means we'd
> want the timestamp for some logger `logger` to be as soon as possible after
> the call to `logger.setLevel(level)` for some level `level`. I'm honestly
> unsure how to clarify this further in the KIP; is there anything in there
> that strikes you as particularly ambiguous that we can tweak to be more
> clear?
>
> RE scope distinction for timestamps: I've updated the KIP to clarify this
> point, adding this sentence: "Timestamps will be updated regardless of
> whether the namespace update was applied using scope=worker or
> scope=cluster.". Let me know what you think
>
> > In the current synchronous implementation for the *PUT
> /admin/loggers/{logger} *endpoint, we return a 404 error if the level is
> invalid (i.e. not one of TRACE, DEBUG, WARN etc.). Since the new cluster
> scoped variant of the endpoint will be asynchronous, can we also add a
> validation to synchronously surface erroneous log levels to users?
>
> Good call! I think we don't have to be explicit about this in the proposed
> changes section, but it's a great fit for the testing plan, where I've
> added it: "Ensure that cluster-scoped requests with invalid logging levels
> are rejected with a 404 response"
>
> > I'm curious to know what the rationale here is? In KIP-745, the stated
> reasoning behind ignoring restart requests during wor

Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-09-05 Thread Sagar
Hey Jack,

The way I interpreted this thread, it seems like there's more alignment on
the DTO based approach. I spent some time on the suggestion that Ismael had
regarding the usage of ProducerRecord. Did you get a chance to look at the
reply I had posted and whether that makes sense? Also, checking out the
AdminClient APIs examples provided by Ismael will give you more context.
Let me know what you think.

Thanks!
Sagar.

On Thu, Aug 31, 2023 at 12:49 PM Jack Tomy  wrote:

> Hey everyone,
>
> As I see devs favouring the current style of implementation, and that is
> inline with existing code. I would like to go ahead with the same approach
> as mentioned in the KIP.
> Can I get a few more votes so that I can take the KIP forward.
>
> Thanks
>
>
>
> On Sun, Aug 27, 2023 at 1:38 PM Sagar  wrote:
>
> > Hi Ismael,
> >
> > Thanks for pointing us towards the direction of a DTO based approach. The
> > AdminClient examples seem very neat and extensible in that sense.
> > Personally, I was trying to think only along the lines of how the current
> > Partitioner interface has been designed, i.e having all requisite
> > parameters as separate arguments (Topic, Key, Value etc).
> >
> > Regarding this question of yours:
> >
> > A more concrete question: did we consider having the method `partition`
> > > take `ProduceRecord` as one of its parameters and `Cluster` as the
> other?
> >
> >
> > No, I don't think in the discussion thread it was brought up and as I
> said
> > it appears that could be due to an attempt to keep the new method's
> > signature similar to the existing one within Partitioner. If I understood
> > the intent of the question correctly, are you trying to hint here that
> > `ProducerRecord` already contains all the arguments that the `partition`
> > method accepts and also has a `headers` field within it. So, instead of
> > adding another method for the `headers` field, why not create a new
> method
> > taking ProducerRecord directly?
> >
> > If my understanding is correct, then it seems like a very clean way of
> > adding support for `headers`. Anyways, the partition method within
> > KafkaProducer already takes a ProducerRecord as an argument so that makes
> > it easier. Keeping that in mind, should this new method's (which takes a
> > ProducerRecord as an input) default implementation invoke the existing
> > method ? One challenge I see there is that the existing partition method
> > expects serialized keys and values while ProducerRecord doesn't have
> access
> > to those (It directly operates on K, V).
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Sun, Aug 27, 2023 at 8:51 AM Ismael Juma  wrote:
> >
> > > A more concrete question: did we consider having the method `partition`
> > > take `ProduceRecord` as one of its parameters and `Cluster` as the
> other?
> > >
> > > Ismael
> > >
> > > On Sat, Aug 26, 2023 at 12:50 PM Greg Harris
> >  > > >
> > > wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > > The mention of "runtime" is specific to Connect. When it comes to
> > > > clients,
> > > > one typically compiles and runs with the same version or runs with a
> > > newer
> > > > version than the one used for compilation. This is standard practice
> in
> > > > Java and not something specific to Kafka.
> > > >
> > > > When I said "older runtimes" I was being lazy, and should have said
> > > > "older versions of clients at runtime," thank you for figuring out
> > > > what I meant.
> > > >
> > > > I don't know how common it is to compile a partitioner against one
> > > > version of clients, and then distribute and run the partitioner with
> > > > older versions of clients and expect graceful degradation of
> features.
> > > > If you say that it is very uncommon and not something that we should
> > > > optimize for, then I won't suggest otherwise.
> > > >
> > > > > With regards to the Admin APIs, they have been extended several
> times
> > > > since introduction (naturally). One of them is:
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/commit/1d22b0d70686aef5689b775ea2ea7610a37f3e8c
> > > >
> > > > Thanks for the example. I also see that includes a migration from
> > > > regular arguments to the DTO style, consiste

Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-05 Thread Sagar
Hey Chris,

Thanks for making the updates.

The updated definition of last_modified looks good to me. As a continuation
to point number 2, could we also mention that this could be used to get
insights into the propagation of the cluster wide log level updates. It is
implicit but probably better to add it I feel.

Regarding

It's a little indirect on the front of worker restart behavior, so let me
> know if that especially should be fleshed out a bit more (possibly by
> calling it out in the "Config topic records" section).


Yeah I would lean on the side of calling it out explicitly. Since the
behaviour is similar to the current dynamically set log levels (i.e
resetting to the log4j config files levels) so we can call it out stating
that similarity just for completeness sake. It would be useful info for
new/medium level users reading the KIP considering worker restarts is not
uncommon.


Thanks, I'm glad that this seems reasonable. I've tentatively added this to
> the rejected alternatives section, but am happy to continue the
> conversation if anyone wants to explore that possibility further.


+1

I had a nit level suggestion but not sure if it makes sense but would still
call it out. The entire namespace name in the config records key (along
with logger-cluster prefix) seems to be a bit too verbose. My first thought
was to not have the prefix org.apache.kafka.connect in the keys considering
it is the root namespace. But since logging can be enabled at a root level,
can we just use initials like (o.a.k.c) which is also a standard practice.
Let me know what you think?

Lastly, I was also thinking if we could introduce a new parameter which
takes a subset of worker ids and enables logging for them in one go. But
this is already achievable by invoking scope=worker endpoint n times to
reflect on n workers so maybe not a necessary change. But this could be
useful on a large cluster. Do you think this is worth listing in the Future
Work section? It's not important so can be ignored as well.

Thanks!
Sagar.


On Wed, Sep 6, 2023 at 12:08 AM Chris Egerton 
wrote:

> Hi Sagar,
>
> Thanks for your thoughts! Responses inline:
>
> > 1) Considering that you have now clarified here that last_modified field
> would be stored in-memory, it is not mentioned in the KIP. Although that's
> something that's understandable, it wasn't apparent when reading the KIP.
> Probably, we could mention it? Also, what happens if a worker restarts? In
> that case, since the log level update message would be pre-dating the
> startup of the worker, it would be ignored? We should probably mention that
> behaviour as well IMO.
>
> I've tweaked the second paragraph of the "Last modified timestamp" section
> to try to clarify this without getting too verbose: "Modification times
> will be tracked in-memory and determined by when they are applied by the
> worker, as opposed to when they are requested by the user or persisted to
> the config topic (details below). If no modifications to the namespace have
> been made since the worker finished startup, they will be null." Does that
> feel sufficient? It's a little indirect on the front of worker restart
> behavior, so let me know if that especially should be fleshed out a bit
> more (possibly by calling it out in the "Config topic records" section).
>
> > 2) Staying on the last_modified field, it's utility is also not too clear
> to me. Can you add some examples of how it can be useful for debugging etc?
>
> The cluster-wide API relaxes the consistency guarantees of the existing
> worker-local API. With the latter, users can be certain that once they
> receive a 2xx response, the logging level on that worker has been updated.
> With the former, users know that the logging level will eventually be
> updated, but insight into the propagation of that update across the cluster
> is limited. Although it's a little primitive, I'm hoping that the last
> modified timestamp will be enough to help shed some light on this process.
> We could also explore exposing the provenance of logging levels (which maps
> fairly cleanly to the scope of the request from which they originated), but
> that feels like overkill at the moment.
>
> > 3) In the test plan, should we also add a test when the scope parameter
> passed is non-null and neither worker nor cluster? In this case the
> behaviour should be similar to the default case.
>
> Good call! Done.
>
> > 4) I had the same question as Yash regarding persistent cluster-wide
> logging level. I think you have explained it well and we can skip it for
> now.
>
> Thanks, I'm glad that this seems reasonable. I've tentatively added this to
> the rejected alternatives section, but am happy to

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-09-06 Thread Sagar
Hey All,

I had an offline discussion with Yash on this and while so far there didn't
seem to be a pressing need to introduce the delete offsets mechanism via
the updateOffsets method, Yash had brought up an interesting point. Point
being that if we don't introduce the deletion of offsets mechanism in this
KIP but do it in a Future version, then connector developers and users
would get different behaviour on tombstone offsets based on the runtime
version being run. This could lead to confusion.

Considering this, I have updated the KIP to also allow deleting offsets via
tombstone records. Thanks Yash for closing out on this one!

Hopefully all open questions have now been addressed.

Thanks!
Sagar.

On Tue, Aug 29, 2023 at 3:33 PM Yash Mayya  wrote:

> Hi Sagar,
>
> > The size of offsets topic can be controlled by
> > setting appropriate topic retention values and
> > that is a standard practice in Kafka
>
> Kafka Connect enforces the `cleanup.policy` configuration for the offsets
> topic to be "compact" only (references - [1], [2]), so the topic retention
> related configurations won't be relevant right?
>
> > Deleting offsets is not something which should
> > be done very frequently and should be handled
> > with care
>
> > Agreed this involves some toil but it's not something
> > that should be done on a very regular basis.
>
> I'm not sure I follow how we came to this conclusion, could you please
> expand on the pitfalls of allowing connector plugins to wipe the offsets
> for source partitions that they no longer care about?
>
> > The usecases you highlighted are edge cases at
> > best. As I have been saying, if it is needed we can
> > always add it in the future but that doesn't look like
> > a problem we need to solve upfront.
>
> I agree that these cases might not be too common, but I'm just trying to
> understand the reasoning behind preventing this use case since null offsets
> don't require any separate handling from the Connect runtime's point of
> view (and wouldn't need any additional implementation work in this KIP).
>
> Thanks,
> Yash
>
> [1] - https://kafka.apache.org/documentation/#connect_running
> [2] -
>
> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java#L47
>
> On Mon, Aug 28, 2023 at 1:38 PM Sagar  wrote:
>
> > Hey Yash,
> >
> > Thanks for your further comments. Here are my responses:
> >
> > 1) Deleting offsets via updateOffsets.
> >
> > Hmm, I am not sure this is really necessary to be part of the KIP at this
> > point, and we can always add it later on if needed. I say this for the
> > following reasons:
> >
> >
> >- The size of offsets topic can be controlled by setting appropriate
> >topic retention values and that is a standard practice in Kafka. Sure
> > it's
> >not always possible to get the right values but as I said it is a
> > standard
> >practice. For Connect specifically, there is also a KIP (KIP-943
> ><
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073470
> > >)
> >which is trying to solve the problem of a large connect-offsets topic.
> > So,
> >if that is really the motivation, then these are being addressed
> > separately
> >anyways.
> >- Deleting offsets is not something which should be done very
> frequently
> >and should be handled with care. That is why KIP-875's mechanism to
> have
> >users/ cluster admin do this externally is the right thing to do.
> Agreed
> >this involves some toil but it's not something that should be done on
> a
> >very regular basis.
> >- There is no stopping connector implementations to send tombstone
> >records as offsets but in practice how many connectors actually do it?
> >Maybe 1 or 2 from what we discussed.
> >- The usecases you highlighted are edge cases at best. As I have been
> >saying, if it is needed we can always add it in the future but that
> > doesn't
> >look like a problem we need to solve upfront.
> >
> > Due to these reasons, I don't think this is a point that we need to
> stress
> > so much upon. I say this because offsets topic's purging/clean up can be
> > handled either via standard Kafka techniques (point #1 above) or via
> > Connect runtime techniques (Pt #2  above). IMO the problem we are trying
> to
> > solve via this KIP has been solved by connectors using tech

Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-09-06 Thread Sagar
Hi All,

Based on the latest discussion thread, it appears as if all open questions
have been answered.

Hopefully now we are in a state where we can close out on the Voting
process.

Thanks everyone for the great feedback.

Thanks!
Sagar.

On Fri, Aug 18, 2023 at 9:00 AM Sagar  wrote:

> Hi All,
>
> Bumping the voting thread again.
>
> Thanks!
> Sagar.
>
> On Wed, Aug 2, 2023 at 4:43 PM Sagar  wrote:
>
>> Attaching the KIP link for reference:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>
>> Thanks!
>> Sagar.
>>
>> On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:
>>
>>> Hi All,
>>>
>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
>>> design. Ofcourse I am open to any feedback/suggestions and would address
>>> them.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>


Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-10 Thread Sagar
Thanks Chris,

The changes look good to me.

1) Regarding the suggestion to reduce the key sizes, the only intent was to
make it easier to read. But then I missed the fact that the
"org.apache.kafka.connect" isn't always going to be the prefix for these
keys. We can live with whatever we have

2) Hmm, I think it just felt like a useful extension to the current
mechanism of changing log levels per worker. One place where it might come
in handy, and which can't be handled by any of the options listed in Future
Work sections, is if somebody wants to observe the rebalance related
activities per worker on a subset of them using finer grained logs. I am
not sure if it's a strong enough motivation but as I said it just felt like
a useful extension. I will leave it to you if you want to add it or not (I
am ok either way).

Thanks!
Sagar.

On Thu, Sep 7, 2023 at 9:26 PM Chris Egerton 
wrote:

> Hi all,
>
> Thanks again for the reviews!
>
>
> Sagar:
>
> > The updated definition of last_modified looks good to me. As a
> continuation
> to point number 2, could we also mention that this could be used to get
> insights into the propagation of the cluster wide log level updates. It is
> implicit but probably better to add it I feel.
>
> Sure, done. Added to the end of the "Config topic records" section: "There
> may be some delay between when a REST request with scope=cluster is
> received and when all workers have read the corresponding record from the
> config topic. The last modified timestamp (details above) can serve as a
> rudimentary tool to provide insight into the propagation of a cluster-wide
> log level adjustment."
>
> > Yeah I would lean on the side of calling it out explicitly. Since the
> behaviour is similar to the current dynamically set log levels (i.e
> resetting to the log4j config files levels) so we can call it out stating
> that similarity just for completeness sake. It would be useful info for
> new/medium level users reading the KIP considering worker restarts is not
> uncommon.
>
> Alright, did this too. Added near the end of the "Config topic records"
> section: "Restarting a worker will cause it to discard all cluster-wide
> dynamic log level adjustments, and revert to the levels specified in its
> Log4j configuration. This mirrors the current behavior with per-worker
> dynamic log level adjustments."
>
> > I had a nit level suggestion but not sure if it makes sense but would
> still
> call it out. The entire namespace name in the config records key (along
> with logger-cluster prefix) seems to be a bit too verbose. My first thought
> was to not have the prefix org.apache.kafka.connect in the keys considering
> it is the root namespace. But since logging can be enabled at a root level,
> can we just use initials like (o.a.k.c) which is also a standard practice.
> Let me know what you think?
>
> Considering these records aren't meant to be user-visible, there doesn't
> seem to be a pressing need to reduce their key sizes (though I'll happily
> admit that to human eyes, the format is a bit ugly). IMO the increase in
> implementation complexity isn't quite worth it, especially considering
> there are plenty of logging namespaces that won't begin with
> "org.apache.kafka.connect" (likely including all third-party connector
> code), like Yash mentions. Is there a motivation for this suggestion that
> I'm missing?
>
> > Lastly, I was also thinking if we could introduce a new parameter which
> takes a subset of worker ids and enables logging for them in one go. But
> this is already achievable by invoking scope=worker endpoint n times to
> reflect on n workers so maybe not a necessary change. But this could be
> useful on a large cluster. Do you think this is worth listing in the Future
> Work section? It's not important so can be ignored as well.
>
> Hmmm... I think I'd rather leave this out for now because I'm just not
> certain enough it'd be useful. The one advantage I can think of is
> targeting specific workers that are behind a load balancer, but being able
> to identify those workers would be a challenge in that scenario anyways.
> Besides that, are there any cases that couldn't be addressed more
> holistically by targeting based on connector/connector type, like Yash
> asks?
>
>
> Ashwin:
>
> Glad we're on the same page RE request forwarding and integration vs.
> system tests! Let me know if anything else comes up that you'd like to
> discuss.
>
>
> Yash:
>
> Glad that it makes sense to keep these changes ephemeral. I'm not quite
> confident enough to put persistent updates in the "Future work" 

Re: [VOTE] 3.6.0 RC0

2023-09-19 Thread Sagar
Hey Satish,

I have commented on KAFKA-15473. I think the changes in the PR look fine. I
also feel this need not be a release blocker given there are other
possibilities in which duplicates can manifest on the response of the end
point in question (albeit we can potentially see more in number due to
this).

Would like to hear others' thoughts as well.

Thanks!
Sagar.


On Tue, Sep 19, 2023 at 3:14 PM Satish Duggana 
wrote:

> Hi Greg,
> Thanks for reporting the KafkaConnect issue. I replied to this issue
> on "Apache Kafka 3.6.0 release" email thread and on
> https://issues.apache.org/jira/browse/KAFKA-15473.
>
> I would like to hear other KafkaConnect experts' opinions on whether
> this issue is really a release blocker.
>
> Thanks,
> Satish.
>
>
>
>
> On Tue, 19 Sept 2023 at 00:27, Greg Harris 
> wrote:
> >
> > Hey all,
> >
> > I noticed this regression in RC0:
> > https://issues.apache.org/jira/browse/KAFKA-15473
> > I've mentioned it in the release thread, and I'm working on a fix.
> >
> > I'm -1 (non-binding) until we determine if this regression is a blocker.
> >
> > Thanks!
> >
> > On Mon, Sep 18, 2023 at 10:56 AM Josep Prat 
> wrote:
> > >
> > > Hi Satish,
> > > Thanks for running the release.
> > >
> > > I ran the following validation steps:
> > > - Built from source with Java 11 and Scala 2.13
> > > - Verified Signatures and hashes of the artifacts generated
> > > - Navigated through Javadoc including links to JDK classes
> > > - Run the unit tests
> > > - Run integration tests
> > > - Run the quickstart in KRaft and Zookeeper mode
> > > - Checked License-binary against libs and matched them
> > >
> > > I +1 this release (non-binding)
> > >
> > > Best,
> > >
> > > On Mon, Sep 18, 2023 at 6:02 PM David Arthur  wrote:
> > >
> > > > Hey Satish, thanks for getting the RC underway!
> > > >
> > > > I noticed that the PR for the 3.6 blog post is merged. This makes
> the blog
> > > > post live on the Kafka website https://kafka.apache.org/blog.html.
> The
> > > > blog
> > > > post (along with other public announcements) is usually the last
> thing we
> > > > do as part of the release. I think we should probably take this down
> until
> > > > we're done with the release, otherwise users stumbling on this post
> could
> > > > get confused. It also contains some broken links.
> > > >
> > > > Thanks!
> > > > David
> > > >
> > > > On Sun, Sep 17, 2023 at 1:31 PM Satish Duggana <
> satish.dugg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Kafka users, developers and client-developers,
> > > > >
> > > > > This is the first candidate for the release of Apache Kafka 3.6.0.
> Some
> > > > of
> > > > > the major features include:
> > > > >
> > > > > * KIP-405 : Kafka Tiered Storage
> > > > > * KIP-868 : KRaft Metadata Transactions
> > > > > * KIP-875: First-class offsets support in Kafka Connect
> > > > > * KIP-898: Modernize Connect plugin discovery
> > > > > * KIP-938: Add more metrics for measuring KRaft performance
> > > > > * KIP-902: Upgrade Zookeeper to 3.8.1
> > > > > * KIP-917: Additional custom metadata for remote log segment
> > > > >
> > > > > Release notes for the 3.6.0 release:
> > > > >
> https://home.apache.org/~satishd/kafka-3.6.0-rc0/RELEASE_NOTES.html
> > > > >
> > > > > *** Please download, test and vote by Wednesday, September 21,
> 12pm PT
> > > > >
> > > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > > https://kafka.apache.org/KEYS
> > > > >
> > > > > * Release artifacts to be voted upon (source and binary):
> > > > > https://home.apache.org/~satishd/kafka-3.6.0-rc0/
> > > > >
> > > > > * Maven artifacts to be voted upon:
> > > > >
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > > >
> > > > > * Javadoc:
> > > > > https://home.apache.org/~satishd/kafka-3.6.0-rc0/javadoc/
> > > > >
> > > > > * Tag to be voted upon (off 3.6 branch) is the 3.6.0 tag:
> > > > > https://github.com/apache/k

Re: [ANNOUNCE] New committer: Yash Mayya

2023-09-21 Thread Sagar
Congrats Yash !
On Thu, 21 Sep 2023 at 9:38 PM, Ashwin  wrote:

> Awesome ! Congratulations Yash !!
>
> On Thu, Sep 21, 2023 at 9:25 PM Edoardo Comar 
> wrote:
>
> > Congratulations Yash
> >
> > On Thu, 21 Sept 2023 at 16:28, Bruno Cadonna  wrote:
> > >
> > > Hi all,
> > >
> > > The PMC of Apache Kafka is pleased to announce a new Kafka committer
> > > Yash Mayya.
> > >
> > > Yash's major contributions are around Connect.
> > >
> > > Yash authored the following KIPs:
> > >
> > > KIP-793: Allow sink connectors to be used with topic-mutating SMTs
> > > KIP-882: Kafka Connect REST API configuration validation timeout
> > > improvements
> > > KIP-970: Deprecate and remove Connect's redundant task configurations
> > > endpoint
> > > KIP-980: Allow creating connectors in a stopped state
> > >
> > > Overall, Yash is known for insightful and friendly input to discussions
> > > and his high quality contributions.
> > >
> > > Congratulations, Yash!
> > >
> > > Thanks,
> > >
> > > Bruno (on behalf of the Apache Kafka PMC)
> >
>


Re: [ANNOUNCE] New committer: Lucas Brutschy

2023-09-21 Thread Sagar
Congrats Lucas !

On Thu, 21 Sep 2023 at 9:15 PM, Bruno Cadonna  wrote:

> Hi all,
>
> The PMC of Apache Kafka is pleased to announce a new Kafka committer
> Lucas Brutschy.
>
> Lucas' major contributions are around Kafka Streams.
>
> Lucas' significantly contributed to the state updater
> (https://issues.apache.org/jira/browse/KAFKA-10199) and he drives the
> implementation of the new threading model for Kafka Streams
> (https://issues.apache.org/jira/browse/KAFKA-15326).
>
> Lucas' contributions to KIP discussions and PR reviews are very thoughtful.
>
> Congratulations, Lucas!
>
> Thanks,
>
> Bruno (on behalf of the Apache Kafka PMC)
>


Re: [DISCUSS] KIP-987: Connect Static Assignments

2023-10-18 Thread Sagar
Hi Greg,

Thanks for the KIP. I have a few questions/comments:

1) You mentioned that during a rebalance if all the members of the cluster
support the static protocol, then it would use the steps outlined in the
Proposed Section to do the assignments. In those steps, the leader
identifies the static/wildcard jobs. It is not clear to me how the leader
makes that distinction? Are we going to enhance the embedded protocol to
also write the static jobs that the worker owns as part of it's assignment?
As of today, the workers just write only the owned/revoked connectors/tasks
in case of incremental and above and only owned connectors/tasks in case of
eager.

2) Could you also elaborate this statement a bit:

> A cluster with both static and wildcard workers can use wildcard workers
> as backups for disaster recovery.


I see in the Strimzi proposal you have explained this scenario (the case
where shared workers are empty as pods aren't created yet etc IIUC), but
reading the KIP doesn't make it too obvious.

3) A nit comment but we should probably call out that there is no need to
assign the connector and task to the same worker when configuring them. I
guess for new users of Connect this could be a source of confusion. WDYT?

4) Staying on the above, I also wanted to know why do we need to set
static.connectors? As such, connectors generally aren't resource intensive.
Can we not let connectors be assigned using the same algorithm as today and
let only tasks be pinned to workers?

5) In the proposed section you also mentioned that

If static assignments are not specified, or at least one worker in the
> cluster is not using the static  protocol, they are ignored and the
> worker may receive an arbitrary assignment.


The arbitrary assignment is driven by the minimum supported protocol right
(sessioned/eager)?

6) Just for my understanding, because now the onus is on the connect admin
to specify which connector/task should be running where, can there be a
situation where there could be an imbalance in terms of number of
connectors/tasks running on workers (due to some oversight on part of admin
or some bug in the tooling used to generate worker configs if any) ? There
could be an imbalance even today as well but the protocol tries to balance
as much as it can in an automated fashion.

7) Lastly, in a cluster where all workers support the static protocol,
there is still no guarantee that all connectors/tasks would be static. In
such a case, what happens when one of the workers is restarted or is being
rolled? Would the assignment eventually be sticky when the worker comes
back? Does this new protocol also abide by the scheduled rebalance delays?
Maybe this has been explained in the KIP but it wasn't clear to me when I
read it.

Thanks!
Sagar.

On Tue, Oct 10, 2023 at 11:22 PM Greg Harris 
wrote:

> Hi Mickael,
>
> I'm not Chris but I hope I can still respond to your questions :)
>
> 1a. This system behaves best when connectors and tasks are known in
> advance, but I don't think that is a requirement. I clarified that the
> worker configurations are permissive of non-existent jobs, which
> allows for bootstrapping an empty cluster and concurrent worker & job
> provisioning.
> 1b. I think the wildcard suggestion is similar to the rejected
> alternative "implement a more complex worker selector...". While we
> could add that sort of functionality, I think it is more difficult to
> express and reason about. For example, with the current configuration,
> users can ensure that at most one job is assigned to a JVM. If a user
> wanted to use wildcards with the same constraint, those wildcards
> would need to express "at most one of ".
> 1c. The javadoc doesn't say what happens to the additional tasks, but
> I think we could start enforcing it if it makes the
> fixed-static-assignment use-case more reliable. I have only ever seen
> additional tasks emitted due to rather serious bugs in connector
> implementations.
>
> 2. Yes, I've added that information to the proposal. I used the `GET
> /connectors/{connector}` API as it doesn't list the task configs, but
> it performs the same function.
>
> 3. Do you mean the resource limits and requests? This is the rejected
> alternative "Model per-job resource constraints and measure resource
> utilization within a single JVM", let me know if the reasoning there
> is not convincing.
>
> 4. I have not explored rack awareness for Connect in detail. I expect
> that rack awareness would require some compromise on the "least
> disruption" property of incremental cooperative rebalancing for shared
> JVMs. We could discuss that in a future KIP.
> As for this proposal, because the management layer is responsible for
> placing workers and placing tasks on those workers, it would 

Re: Security for Kafka

2023-10-18 Thread Sagar
Hey Walchester,

There's a confluent community slack Workspace having a #security channel
where you can post your question. Also, have you filed a bug in AK JIRA
which can also help you get the traction of some of the community members
who have expertise in this area.

Thanks!
Sagar.

On Thu, Oct 19, 2023 at 11:52 AM Walchester Gaw 
wrote:

> Hello.
>
> Is there something like a community page for Kafka where I can reach out to
> the community where hopefully someone with a similar setup can help?
>
> Thanks,
> Chester
>
>
> On Thu, Oct 12, 2023 at 10:48 AM Walchester Gaw 
> wrote:
>
> > Hello.
> >
> > I am trying to implement Quorum TLS by following the instructions in
> > https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#Quorum+TLS,
> > but I keep on encountering the following errors after doing the second
> > rolling restart where sslQuorum set to true.
> >
> >- [2023-10-11 05:46:03,250] WARN Cannot open channel to 3 at election
> >address /xxx.xx.xx.xxx: (
> >org.apache.zookeeper.server.quorum.QuorumCnxManager)
> >javax.net.ssl.SSLHandshakeException: Received fatal alert:
> >handshake_failure
> >- [2023-10-11 05:47:12,513] WARN Closing connection to /xxx.xx.xx.
> >xxx: (org.apache.zookeeper.server.NettyServerCnxn)
> >java.io.IOException: ZK down
> >
> > Our current Cluster setup consists of 3 Linux servers (Amazon EC2
> > instances) which contains one Zookeeper and Broker for each server. I
> have
> > tried using Private IP DNS name and Public IPv4 DNS as the alias and
> > distinguished name when generating the self signed certificate for each
> of
> > the servers. For the generation of CA key and CA certificate, I used the
> > Private IP DNS name and Public IPv4 DNS of one the servers as the common
> > name respectively. Do note I am generating all keystores/truststore in
> just
> > one server (this server's IP is indicated in CA key and CA cert) and
> > distributing them accordingly.
> >
> > I made sure that all ZK is up and running when I am getting the ZK down
> > issue and I am getting that error for all three ZKs. I can also confirm
> > that the file path indicated in the zookeeper.properties where the
> keystore
> > and truststore is located is correct.
> >
> > Can someone assist regarding this? What am I missing here?  Let me know
> if
> > you need more information.
> >
> > I am also unsure if there is something like a community page for Kafka
> > where I can reach out to the community where hopefully someone with a
> > similar setup can help.
> >
> > Thanks,
> > Chester
> >
>


Re: [DISCUSS] KIP-987: Connect Static Assignments

2023-10-27 Thread Sagar
Hi Greg,

Thanks for the response.

This is detailed in the KIP two sentences earlier: "If the
> connect.protocol is set to static, each worker will send it's
> static.connectors and static.tasks to the coordinator during
> rebalances."


Yes I saw that. My point was if this is going to need a change in the
Connect Protocol format, then those changes should be outlined in the KIP.


Yes. Arbitrary here just means that the assignment is not
> influenced by the static assignment.


Okay, the usage of the word arbitrary is a bit confusing but I would leave
it to that.

Also, when I read Mickael's suggestion above about using a taint/selector
mechanism, it seems cleaner. The main reason IMO is that we are letting
connectors and tasks define where and how they want to be placed and we let
the runtime decide it(aided with a ResourceManager if needed) . With the
approach proposed in the KIP, anytime any changes are needed to the
placements, we need to modify worker configs and add/edit/remove
connectors/tasks on it. This hasn't generally been the case and in my
experience worker configs are not really modified that often. Moreover,
administrators will need to also choose where to place the connectors/tasks
based on the load on the cluster which again is an added overhead.

However you brought up a good point in the KIP

However, because tasks within a single connector can be heterogeneous, it
> is necessary to attach a different selector/affinity declaration to each
> task. But because tasks are dynamically created by the connector, either
> every task's selector must be added manually after defaulting to a wildcard
> affinity, or the Connect REST API would need a way to template multiple
> task affinities (parameterized by task-id).


as well as here above. If we consider the idea proposed by Mickael above,
and consider just the isolated config value for selector, which means that
any task being spun up for this connector would be run on a dedicated
worker. If we use this, then we might see a proliferation of workers
happening on the cluster so if we consider adding another worker level
config called max.tasks or something similar and if we couple the 2, in
effect we can achieve the same thing as the KIP propoeses (obviously I
might be over-simplifying it a bit here, but I hope the idea is clear). The
config values suggested above should cover all cases, but I still do agree
to the fact that setting the selector config value at a task level is a
challenge.

Thanks!
Sagar.


On Fri, Oct 20, 2023 at 10:43 PM Greg Harris 
wrote:

> Hey Hector,
>
> That's a cool idea for the ConnectAssignor plugin.
>
> While this proposal could be viewed as an "assignor" problem that a
> custom assignor could solve, it's really about providing additional
> context to the assignor which isn't present currently. This lack of
> context would prevent a custom assignor from solving the resource
> utilization problem adequately.
>
> Thanks!
> Greg
>
> On Fri, Oct 20, 2023 at 9:58 AM Greg Harris  wrote:
> >
> > Mickael,
> >
> > Thank you for discussing that rejected alternative, I was almost going
> > to propose it.
> >
> > > I still find the proposed mechanism limited and I'm not sure it really
> addressed the pain points I've experienced with Connect.
> >
> > I think that this KIP on its own is insufficient to solve the
> > operational difficulties of Connect, and an external management layer
> > is necessary. In this KIP i'm trying to find the minimum viable
> > abstraction to allow a management layer to make decisions about
> > placement, knowing that the abstraction may be non-ergonomic for
> > "direct users" without a management layer mediating.
> >
> > > Connectors may also change the assignment of tasks at runtime so for
> example it task-2 is really busy (because it's assigned a partition with
> high throughput), this may not be true in 10 minutes as this partition is
> now assigned to task-1
> >
> > I think this is similar to a concern (#5) that Tom raised, and a
> > limitation of the "task index" abstraction. I don't know if there is a
> > way for us to manage this sort of fine-grained dynamic utilization of
> > tasks. Once we start a task, it has some static resources assigned to
> > it (via the JVM). If you notice the resource requirements expand, it
> > will need to stop in order to move JVMs and change its resource
> > allocation, and stopping the task may cause assignments to change and
> > the workload to be distributed elsewhere.
> >
> > > I think the "hints" where to place a connector/tasks should come from
> the connector configuration as it's the engine

Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-11 Thread Sagar
Hi David,

Thanks for bringing this point and also for creating the revert PRs. I
think there has been an effort in the community to fix a lot of flakey
tests(like around MirrorMaker). I also agree that we shouldn't merge PRs
without green builds and look to ignore flaky tests. For example, I did a
quick search for some of the common (and sometimes longstanding flakey
tests) and this is a brief list. Some of them have JIRAs associated with
them as well =>

1)
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated
=> https://issues.apache.org/jira/browse/KAFKA-8115

2) kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize =>
https://issues.apache.org/jira/browse/KAFKA-15421

3)
org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers
(Couldn't find JIRA).

4)
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
 => https://issues.apache.org/jira/browse/KAFKA-15020

5) EosIntegrationTest => https://issues.apache.org/jira/browse/KAFKA-15690

The only thing is where do we draw the line of disabling a genuine flakey
test v/s looking to fix it. I feel that could get confusing at times
especially if the flakey test involved is on an unrelated part of code
(like a flaky Connect test on Group Coordinator or Streams).

Thanks!
Sagar.


On Sat, Nov 11, 2023 at 3:31 PM David Jacot 
wrote:

> Hi all,
>
> The state of our CI worries me a lot. Just this week, we merged two PRs
> with compilation errors and one PR introducing persistent failures. This
> really hurts the quality and the velocity of the project and it basically
> defeats the purpose of having a CI because we tend to ignore it nowadays.
>
> Should we continue to merge without a green build? No! We should not so I
> propose to prevent merging a pull request without a green build. This is a
> really simple and bold move that will prevent us from introducing
> regressions and will improve the overall health of the project. At the same
> time, I think that we should disable all the known flaky tests, raise jiras
> for them, find an owner for each of them, and fix them.
>
> What do you think?
>
> Best,
> David
>


Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-13 Thread Sagar
Hi Divij,

I think this proposal overall makes sense. My only nit sort of a suggestion
is that let's also consider a label called newbie++[1] for flaky tests if
we are considering adding newbie as a label. I think some of the flaky
tests need familiarity with the codebase or the test setup so as a first
time contributor, it might be difficult. newbie++ IMO covers that aspect.

[1]
https://issues.apache.org/jira/browse/KAFKA-15406?jql=project%20%3D%20KAFKA%20AND%20labels%20%3D%20%22newbie%2B%2B%22

Let me know what you think.

Thanks!
Sagar.

On Mon, Nov 13, 2023 at 9:11 PM Divij Vaidya 
wrote:

> >  Please, do it.
> We can use specific labels to effectively filter those tickets.
>
> We already have a label and a way to discover flaky tests. They are tagged
> with the label "flaky-test" [1]. There is also a label "newbie" [2] meant
> for folks who are new to Apache Kafka code base.
> My suggestion is to send a broader email to the community (since many will
> miss details in this thread) and call for action for committers to
> volunteer as "shepherds" for these tickets. I can send one out once we have
> some consensus wrt next steps in this thread.
>
>
> [1]
>
> https://issues.apache.org/jira/browse/KAFKA-13421?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20resolution%20%3D%20Unresolved%20AND%20labels%20%3D%20flaky-test%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC
>
>
> [2] https://kafka.apache.org/contributing -> Finding a project to work on
>
>
> Divij Vaidya
>
>
>
> On Mon, Nov 13, 2023 at 4:24 PM Николай Ижиков 
> wrote:
>
> >
> > > To kickstart this effort, we can publish a list of such tickets in the
> > community and assign one or more committers the role of a «shepherd" for
> > each ticket.
> >
> > Please, do it.
> > We can use specific label to effectively filter those tickets.
> >
> > > 13 нояб. 2023 г., в 15:16, Divij Vaidya 
> > написал(а):
> > >
> > > Thanks for bringing this up David.
> > >
> > > My primary concern revolves around the possibility that the currently
> > > disabled tests may remain inactive indefinitely. We currently have
> > > unresolved JIRA tickets for flaky tests that have been pending for an
> > > extended period. I am inclined to support the idea of disabling these
> > tests
> > > temporarily and merging changes only when the build is successful,
> > provided
> > > there is a clear plan for re-enabling them in the future.
> > >
> > > To address this issue, I propose the following measures:
> > >
> > > 1\ Foster a supportive environment for new contributors within the
> > > community, encouraging them to take on tickets associated with flaky
> > tests.
> > > This initiative would require individuals familiar with the relevant
> code
> > > to offer guidance to those undertaking these tasks. Committers should
> > > prioritize reviewing and addressing these tickets within their
> available
> > > bandwidth. To kickstart this effort, we can publish a list of such
> > tickets
> > > in the community and assign one or more committers the role of a
> > "shepherd"
> > > for each ticket.
> > >
> > > 2\ Implement a policy to block minor version releases until the Release
> > > Manager (RM) is satisfied that the disabled tests do not result in gaps
> > in
> > > our testing coverage. The RM may rely on Subject Matter Experts (SMEs)
> in
> > > the specific code areas to provide assurance before giving the green
> > light
> > > for a release.
> > >
> > > 3\ Set a community-wide goal for 2024 to achieve a stable Continuous
> > > Integration (CI) system. This goal should encompass projects such as
> > > refining our test suite to eliminate flakiness and addressing
> > > infrastructure issues if necessary. By publishing this goal, we create
> a
> > > shared vision for the community in 2024, fostering alignment on our
> > > objectives. This alignment will aid in prioritizing tasks for community
> > > members and guide reviewers in allocating their bandwidth effectively.
> > >
> > > --
> > > Divij Vaidya
> > >
> > >
> > >
> > > On Sun, Nov 12, 2023 at 2:58 AM Justine Olshan
> > 
> > > wrote:
> > >
> > >> I will say that I have also seen tests that seem to be more flaky
> > >> intermittently. It may be ok for some time and suddenly the CI is
&

Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-11-15 Thread Sagar
Hey all,

Bumping this vote thread again after quite a while.

Thanks!
Sagar.

On Wed, Sep 6, 2023 at 3:58 PM Sagar  wrote:

> Hi All,
>
> Based on the latest discussion thread, it appears as if all open questions
> have been answered.
>
> Hopefully now we are in a state where we can close out on the Voting
> process.
>
> Thanks everyone for the great feedback.
>
> Thanks!
> Sagar.
>
> On Fri, Aug 18, 2023 at 9:00 AM Sagar  wrote:
>
>> Hi All,
>>
>> Bumping the voting thread again.
>>
>> Thanks!
>> Sagar.
>>
>> On Wed, Aug 2, 2023 at 4:43 PM Sagar  wrote:
>>
>>> Attaching the KIP link for reference:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>>
>>> Thanks!
>>> Sagar.
>>>
>>> On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:
>>>
>>>> Hi All,
>>>>
>>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
>>>> design. Ofcourse I am open to any feedback/suggestions and would address
>>>> them.
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>


[DISCUSS] KIP-1083: Increase default value of task.shutdown.graceful.timeout.ms in Connect

2024-08-14 Thread Sagar
Hey everyone,

I created  KIP-1083
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1083%3A+Increase+default+value+of+task.shutdown.graceful.timeout.ms+in+Connect>
to increase the default value of task.shutdown.graceful.timeout.ms config
in Connect. Please review.

Thanks!
Sagar.


Re: [ANNOUNCE] New Kafka PMC Member: Divij Vaidya

2023-12-27 Thread Sagar
Congrats Divij! Absolutely well deserved !

Thanks!
Sagar.

On Wed, Dec 27, 2023 at 5:15 PM Luke Chen  wrote:

> Hi, Everyone,
>
> Divij has been a Kafka committer since June, 2023. He has remained very
> active and instructive in the community since becoming a committer. It's my
> pleasure to announce that Divij is now a member of Kafka PMC.
>
> Congratulations Divij!
>
> Luke
> on behalf of Apache Kafka PMC
>


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2024-02-02 Thread Sagar
Thanks Yash!

I am hoping to have this released in 3.8 so it would be good to get the
remaining 2 votes.

Thanks!
Sagar.


On Tue, Jan 30, 2024 at 3:18 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the KIP and apologies for the extremely long delay here! I think
> we could do with some wordsmithing on the Javadoc for the new
> `SourceTask::updateOffsets` method but that can be taken care of in the PR.
>
> +1 (binding)
>
> Thanks,
> Yash
>
> On Wed, Nov 15, 2023 at 11:43 PM Sagar  wrote:
>
> > Hey all,
> >
> > Bumping this vote thread again after quite a while.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, Sep 6, 2023 at 3:58 PM Sagar  wrote:
> >
> > > Hi All,
> > >
> > > Based on the latest discussion thread, it appears as if all open
> > questions
> > > have been answered.
> > >
> > > Hopefully now we are in a state where we can close out on the Voting
> > > process.
> > >
> > > Thanks everyone for the great feedback.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Fri, Aug 18, 2023 at 9:00 AM Sagar 
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> Bumping the voting thread again.
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> > >> On Wed, Aug 2, 2023 at 4:43 PM Sagar 
> wrote:
> > >>
> > >>> Attaching the KIP link for reference:
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >>>
> > >>> Thanks!
> > >>> Sagar.
> > >>>
> > >>> On Wed, Aug 2, 2023 at 4:37 PM Sagar 
> > wrote:
> > >>>
> > >>>> Hi All,
> > >>>>
> > >>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a
> > reasonable
> > >>>> design. Ofcourse I am open to any feedback/suggestions and would
> > address
> > >>>> them.
> > >>>>
> > >>>> Thanks!
> > >>>> Sagar.
> > >>>>
> > >>>
> >
>


Re: [ANNOUNCE] New committer: Christo Lolov

2024-03-26 Thread Sagar
Congrats Christo!

Sagar.

On Tue, 26 Mar 2024 at 6:04 PM, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Congrats Christo!!
>
> On Tue 26. Mar 2024 at 14.33, Apoorv Mittal 
> wrote:
>
> > Congrats Christo!
> >
> > Regards,
> > Apoorv Mittal
> > +44 7721681581
> >
> >
> > On Tue, Mar 26, 2024 at 12:05 PM Luke Chen  wrote:
> >
> > > Hi, Everyone,
> > >
> > > The PMC of Apache Kafka is pleased to announce a new Kafka committer:
> > > Christo Lolov.
> > >
> > > Christo has been a Kafka contributor since 2021. He has made over 50
> > > commits. He authored KIP-902, KIP-963, and KIP-1005, as well as many
> > tiered
> > > storage related tasks. He also co-drives the migration from EasyMock to
> > > Mockito and from Junit 4 to JUnit 5.
> > >
> > > Congratulations, Christo!
> > >
> > > Thanks,
> > > Luke (on behalf of the Apache Kafka PMC)
> > >
> >
>


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2024-04-25 Thread Sagar
Hey All,

Bumping the vote thread after a long time!

Thanks!
Sagar.

On Fri, Feb 2, 2024 at 4:24 PM Sagar  wrote:

> Thanks Yash!
>
> I am hoping to have this released in 3.8 so it would be good to get the
> remaining 2 votes.
>
> Thanks!
> Sagar.
>
>
> On Tue, Jan 30, 2024 at 3:18 PM Yash Mayya  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the KIP and apologies for the extremely long delay here! I
>> think
>> we could do with some wordsmithing on the Javadoc for the new
>> `SourceTask::updateOffsets` method but that can be taken care of in the
>> PR.
>>
>> +1 (binding)
>>
>> Thanks,
>> Yash
>>
>> On Wed, Nov 15, 2023 at 11:43 PM Sagar  wrote:
>>
>> > Hey all,
>> >
>> > Bumping this vote thread again after quite a while.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 6, 2023 at 3:58 PM Sagar  wrote:
>> >
>> > > Hi All,
>> > >
>> > > Based on the latest discussion thread, it appears as if all open
>> > questions
>> > > have been answered.
>> > >
>> > > Hopefully now we are in a state where we can close out on the Voting
>> > > process.
>> > >
>> > > Thanks everyone for the great feedback.
>> > >
>> > > Thanks!
>> > > Sagar.
>> > >
>> > > On Fri, Aug 18, 2023 at 9:00 AM Sagar 
>> wrote:
>> > >
>> > >> Hi All,
>> > >>
>> > >> Bumping the voting thread again.
>> > >>
>> > >> Thanks!
>> > >> Sagar.
>> > >>
>> > >> On Wed, Aug 2, 2023 at 4:43 PM Sagar 
>> wrote:
>> > >>
>> > >>> Attaching the KIP link for reference:
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> > >>>
>> > >>> Thanks!
>> > >>> Sagar.
>> > >>>
>> > >>> On Wed, Aug 2, 2023 at 4:37 PM Sagar 
>> > wrote:
>> > >>>
>> > >>>> Hi All,
>> > >>>>
>> > >>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a
>> > reasonable
>> > >>>> design. Ofcourse I am open to any feedback/suggestions and would
>> > address
>> > >>>> them.
>> > >>>>
>> > >>>> Thanks!
>> > >>>> Sagar.
>> > >>>>
>> > >>>
>> >
>>
>


[DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-11 Thread Sagar
Hi All,

I would like to start a discussion on the KIP that I created below to add
prefix scan support in State Stores:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores

Thanks!
Sagar.


Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-14 Thread Sagar
Hey @Adam,

Thanks for sharing your experience with using prefix seek. I did look at
your code for RocksDBPrefixIterator, infact I have repurposed that class
itself since it wasn't being used else where. Regarding how I plan to
expose them through-out the state stores, what I have tried to do is add it
as a separate interface. So, basically, it is not at the same level as the
*range function so to speak. The reason I did that is currently I feel not
all state stores are a natural fit for prefix seek. As I mentioned in the
KIP as well, the current equivalent to it could be BulkLoadingStore(not in
terms of functionality but in terms of how it is also not implemented by
all of them). So, that ways I am not needing to stub them across all the
state-stores and we can implement it only where needed. For example, in the
PR that I have put for reference in the KIP, you can see that I have it
implemented only for RocksDB.

@Guozhang,

Thanks for the feedback. Those are very interesting questions and I will
try my best to answer based upon whatever limited understanding I have
developed so far :)

1) Regarding the usage of useFixedLengthPrefixExtractor, honestly, I hadn't
looked at that config. I did look it up after you pointed it out and seems
it's more for hash-based memtables? I may be wrong though. But what I would
say is that, the changes I had made were not exactly from a correctness
stand point but more from trying to showcase how we can implement these
changes. The idea was that once we see the merit in this approach then we
can add some of the tunings( and I would need your team's assistance there
:D).

2) Regarding the similarity of `RocksDBPrefixIterator` and
`RocksDBRangeIterator`, yes the implementations look more or less similar.
So, in terms of performance, they might be similar. But semantically, they
can solve 2 different use-cases. The range seek is useful when we know both
from and to. But if we consider use-cases where we want to find keys with a
certain prefix, but we don't know if what it's start and end is, then
prefix seek would come in more handy. The point that I am trying to make is
that it can extend the scope of state stores from just point lookups to
somewhat being able to speculative queries where by users can search if a
certain pattern exists. I can vouch for this personally because I wanted to
use state stores for one such use case and since this option wasn't there,
I had to do some other things. An equivalent to this could be SCAN operator
in Redis. (Not trying to compare the Redis and state stores but trying to
give some context).

Regarding the point on bloom filter, I think there are certain
optimisations that are being talked about in case of prefix seek here:

https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
Again
this isn't something that I have explored fully. Also, on the prefix seek
page on RocksDB they mention that there's a prefix iterating technique
called Prefix Bloom Filter.

3) Regarding the question on length of bytes for seek v/s prefix seek, I am
not entirely sure about that scenario. What I have understood is that
at-least for Rocks DB, it is more performant for short iterator queries
that longer ones.

4) Regarding the last question on placing it within Segment, the reason I
didn't do that way, is that I thought we shouldn't tie this feature only to
RocksDB. I agree that I got this idea while looking/reading about RocksDB
but if we keep it outside the purview of RocksDB and keep it as a pluggable
entity, then a) it remains generic by not being tied to any specific store
and b) no change is needed at all for any of the other stores which haven't
implemented it.

I am not sure of any of the above points make sense but as I said, this is
based out of my limited understanding of the codebase. So, pardon any
incorrect/illogical statements plz!

@Sophie,

Thanks for bringing that point up! I have mentioned about that PR in the
KIP under a section called Other considerations. Nonetheless, thanks for
pointing it out!

Thanks!
Sagar.


On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman 
wrote:

> Not to derail this KIP discussion, but to leave a few notes on some of the
> RocksDB points that have come up:
>
> Someone actually merged some long overdue performance improvements to
> the RocksJava implementation (the PR was opened back in 2017! yikes).
> I haven't looked into the prefix seek API closely enough to know how
> relevant
> this particular change is, and they are still improving things, but it
> gives me some
> faith.
>
> There are some pretty promising results reported on the PR:
> https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
>
> Regarding the custom comparator, they also recently merged this performance
> <https://github.com/facebook/rocksdb/pull/6252>
> improvement <https://g

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-26 Thread Sagar
Hi John,

Thanks for the detailed reply. I was a bit crammed with work last week so
couldn't respond earlier so apologies for that.

First of all, thanks for the context that both you and Adam have
provided me on the issues faced previously. As I can clearly see, while I
was able to cut some corners while writing some test cases or benchmarks,
to be able to stitch together a store with prefix scan into an actual
topology needs more work. I am sorry for the half baked tests that I wrote
without realising and you have rightly put it when you said these
challenges aren't obvious up front.

Now, coming back to the other points, I spent some time going through the
KIP-213 and also some of the code snippets that are talked about in that
KIP. With the detailed explanation that you provided, it is now obvious to
me that keeping a generic type for keys like K won't work oob and hence a
decision was made to use Bytes as the key type.

I just had another thought on this though. I looked at the range function
that was added in the ReadOnlyKeyValueStore. While the Key and the Value
mentioned in that method is generic, internally almost all queries end up
querying using Bytes in some or the other form. I looked at not just
RocksDb Store but other stores like InMemory store or MemoryLRU and this
seems to be the pattern. I think this stems from the fact that these stores
while implementing KeyValueStore pass Bytes, byte[] as the K and V values.
Classes like MeteredKeyValueStore which don't do this, still use Bytes.wrap
to wrap the passed keys and values and invoke the range method.

So, the point I am trying to make is, with the same behaviour - and
ignoring for a moment that it's a separate interface which I am trying to
"mix-in"- the issues with the key types could be resolved. I may be wrong
though so would like to know your thoughts on this. Infact unknowingly the
interface implementation of PrefixSeekableType in RockDBStateStore also
passes Bytes and bytes[] as K and V.

The second part of exposing it via the publically accessible interfaces to
which we downcast while building the topology (like KeyValueStore), I can
clearly see now that mixing-in the way I tried to won't work. My intention
all along was not to hamper the flow of those stores which don't support
prefix scan as yet and hence the separate interface. But, I agree that for
this to work, it needs to be part of some pre-defined store types like
KVStore etc. Right now, I don't have an answer to this but mostly it would
have to be moved there and implemented across all stores(if we see the
worth in prefix scans :) )

Regarding the motivation, I am sorry if I wasn't clear. This originated
from one of my own use cases with kafka streams where i needed to find some
keys based upon certain prefix. Infact it's similar to the
RangeScanCombinedKeyUsage diagram in KIP-213 where the otherTable tries to
find entries in the state store based upon the FK. I was using
KevValueStore to be precise. I also remember having a slack conversation on
this, and I was told that this isn't supported right now, but some other
users shared their experiences on how with some hacks they are able to
perform prefix scans even though their use case fits the bill for a prefix
scan. That kind of motivated me to take a stab at it. Unfortunately, I have
lost the slack chat because of some cleanup at the slack channel level. I
will try and update the ambiguous motivation statement in the near future.

Lastly, I would like to point out, that your response was not at all
discouraging. On the contrary it was really insightful and it's always good
to learn/discover new things :)

Thanks!
Sagar.

On Fri, May 15, 2020 at 7:37 AM John Roesler  wrote:

> Hi, Sagar!
>
> Thanks for this KIP. I'm sorry it took me so long to reply. I'll number my
> points differently to avoid confusion.
>
> I can provide some additional context on the difficulties we previously
> faced in KIP-213 (which you and Adam have already discussed).
>
> J1) In your KIP, you propose the following interface:
>
> public interface PrefixSeekableStore {
> KeyValueIterator prefixSeek(K prefix);
> }
>
> This is roughly the same thing that Adam and I were considering
> before. It has a hidden problem, that it assumes that prefixes of
> keys in the key space are also in the key space. In other words, this
> is a store with key type K, and the API assumes that prefixes are also
> of type K. This is true for some key types, like String or Bytes, but not
> for others.
>
> For example, if the keys are UUIDs, then no prefix is also a UUID. If the
> key is a complex data type, like Windowed in our own DSL, then
> we would absolutely want to query all keys with the same record key
> (the K part), or the same window start time, but in neither case is the
> prefix actually a Windowed.

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-31 Thread Sagar
Hi John,

Thank you. I think it makes sense to modify the KIP to add the prefixScan()
as part of the existing interfaces and add the new mixin behaviour as
Rejected alternatives. I am not very aware of other stores apart from
keyValueStore so is it fine if I keep it there for now?

Regarding the type definition of types I will try and think about some
alternatives and share if I get any.

Thanks!
Sagar.


On Sun, May 31, 2020 at 1:55 AM John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the response. Your use case makes sense to me; I figured it
> must be something like that.
>
> On a pragmatic level, in the near term, you might consider basically doing
> the same thing we did in KIP-213. If you swap out the store types for
> Byte/byte[] and “manually” invoke the serdes in your own logic, then you
> can use the same algorithm we did to derive the range scan boundaries from
> your desired prefix.
>
> For the actual KIP, it seems like we would need significant design
> improvements to be able to do any mixins, so I think we should favor
> proposing either to just add to the existing interfaces or to create brand
> new interfaces, as appropriate, for now. Given that prefix can be converted
> to a range query at a low level, I think we can probably explore adding
> prefix to the existing interfaces with a default implementation.
>
> It seems like that just leaves the question of how to define the type of
> the prefix. To be honest, I don’t have any great ideas here. Are you able
> to generate some creative solutions, Sagar?
>
> Thanks,
> John
>
> On Tue, May 26, 2020, at 06:42, Sagar wrote:
> > Hi John,
> >
> > Thanks for the detailed reply. I was a bit crammed with work last week so
> > couldn't respond earlier so apologies for that.
> >
> > First of all, thanks for the context that both you and Adam have
> > provided me on the issues faced previously. As I can clearly see, while I
> > was able to cut some corners while writing some test cases or benchmarks,
> > to be able to stitch together a store with prefix scan into an actual
> > topology needs more work. I am sorry for the half baked tests that I
> wrote
> > without realising and you have rightly put it when you said these
> > challenges aren't obvious up front.
> >
> > Now, coming back to the other points, I spent some time going through the
> > KIP-213 and also some of the code snippets that are talked about in that
> > KIP. With the detailed explanation that you provided, it is now obvious
> to
> > me that keeping a generic type for keys like K won't work oob and hence a
> > decision was made to use Bytes as the key type.
> >
> > I just had another thought on this though. I looked at the range function
> > that was added in the ReadOnlyKeyValueStore. While the Key and the Value
> > mentioned in that method is generic, internally almost all queries end up
> > querying using Bytes in some or the other form. I looked at not just
> > RocksDb Store but other stores like InMemory store or MemoryLRU and this
> > seems to be the pattern. I think this stems from the fact that these
> stores
> > while implementing KeyValueStore pass Bytes, byte[] as the K and V
> values.
> > Classes like MeteredKeyValueStore which don't do this, still use
> Bytes.wrap
> > to wrap the passed keys and values and invoke the range method.
> >
> > So, the point I am trying to make is, with the same behaviour - and
> > ignoring for a moment that it's a separate interface which I am trying to
> > "mix-in"- the issues with the key types could be resolved. I may be wrong
> > though so would like to know your thoughts on this. Infact unknowingly
> the
> > interface implementation of PrefixSeekableType in RockDBStateStore also
> > passes Bytes and bytes[] as K and V.
> >
> > The second part of exposing it via the publically accessible interfaces
> to
> > which we downcast while building the topology (like KeyValueStore), I can
> > clearly see now that mixing-in the way I tried to won't work. My
> intention
> > all along was not to hamper the flow of those stores which don't support
> > prefix scan as yet and hence the separate interface. But, I agree that
> for
> > this to work, it needs to be part of some pre-defined store types like
> > KVStore etc. Right now, I don't have an answer to this but mostly it
> would
> > have to be moved there and implemented across all stores(if we see the
> > worth in prefix scans :) )
> >
> > Regarding the motivation, I am sorry if I wasn't clear. This originated
> > from one of my own use cases with kafka str

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-01 Thread Sagar
Hi John,

Just to add to my previous email(and sorry for the spam), if we consider
using Bytes/byte[] and manually invoke the serdes, if you could provide
examples where the same Serde for keys won't work for the prefix types. As
far as my understanding goes, the prefix seek would depend upon ordering of
the keys like lexicographic. As long as the binary format is consistent for
both the keys and the prefixes would it not ensure the ability to search in
that same ordering space? This is from my limited understanding so any
concrete examples would be helpful...

Also, you mentioned about the creation of dummy values to indicate prefix
values, do you mean this line:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L91
This
is where the prefix key is built and used for searching .

Thanks!
Sagar.

On Mon, Jun 1, 2020 at 11:42 AM Sagar  wrote:

> Hi John,
>
> Thank you. I think it makes sense to modify the KIP to add the
> prefixScan() as part of the existing interfaces and add the new mixin
> behaviour as Rejected alternatives. I am not very aware of other stores
> apart from keyValueStore so is it fine if I keep it there for now?
>
> Regarding the type definition of types I will try and think about some
> alternatives and share if I get any.
>
> Thanks!
> Sagar.
>
>
> On Sun, May 31, 2020 at 1:55 AM John Roesler  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the response. Your use case makes sense to me; I figured it
>> must be something like that.
>>
>> On a pragmatic level, in the near term, you might consider basically
>> doing the same thing we did in KIP-213. If you swap out the store types for
>> Byte/byte[] and “manually” invoke the serdes in your own logic, then you
>> can use the same algorithm we did to derive the range scan boundaries from
>> your desired prefix.
>>
>> For the actual KIP, it seems like we would need significant design
>> improvements to be able to do any mixins, so I think we should favor
>> proposing either to just add to the existing interfaces or to create brand
>> new interfaces, as appropriate, for now. Given that prefix can be converted
>> to a range query at a low level, I think we can probably explore adding
>> prefix to the existing interfaces with a default implementation.
>>
>> It seems like that just leaves the question of how to define the type of
>> the prefix. To be honest, I don’t have any great ideas here. Are you able
>> to generate some creative solutions, Sagar?
>>
>> Thanks,
>> John
>>
>> On Tue, May 26, 2020, at 06:42, Sagar wrote:
>> > Hi John,
>> >
>> > Thanks for the detailed reply. I was a bit crammed with work last week
>> so
>> > couldn't respond earlier so apologies for that.
>> >
>> > First of all, thanks for the context that both you and Adam have
>> > provided me on the issues faced previously. As I can clearly see, while
>> I
>> > was able to cut some corners while writing some test cases or
>> benchmarks,
>> > to be able to stitch together a store with prefix scan into an actual
>> > topology needs more work. I am sorry for the half baked tests that I
>> wrote
>> > without realising and you have rightly put it when you said these
>> > challenges aren't obvious up front.
>> >
>> > Now, coming back to the other points, I spent some time going through
>> the
>> > KIP-213 and also some of the code snippets that are talked about in that
>> > KIP. With the detailed explanation that you provided, it is now obvious
>> to
>> > me that keeping a generic type for keys like K won't work oob and hence
>> a
>> > decision was made to use Bytes as the key type.
>> >
>> > I just had another thought on this though. I looked at the range
>> function
>> > that was added in the ReadOnlyKeyValueStore. While the Key and the Value
>> > mentioned in that method is generic, internally almost all queries end
>> up
>> > querying using Bytes in some or the other form. I looked at not just
>> > RocksDb Store but other stores like InMemory store or MemoryLRU and this
>> > seems to be the pattern. I think this stems from the fact that these
>> stores
>> > while implementing KeyValueStore pass Bytes, byte[] as the K and V
>> values.
>> > Classes like MeteredKeyValueStore which don't do this, still use
>> Bytes.wrap
>> > to wrap the passed keys and values and invoke the range method.
>> >
>> > So, the

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-12 Thread Sagar
Thank you Guozhang/David for the feedback. Looks like there's agreement on
using separate APIs for Connect. I would revisit the doc and see what
changes are to be made.

Thanks!
Sagar.

On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
wrote:

> Hi Sagar,
>
> Thanks for the feedback and the document. That's really helpful. I
> will take a look at it.
>
> Overall, it seems to me that both Connect and the Consumer could share
> the same underlying "engine". The main difference is that the Consumer
> assigns topic-partitions to members whereas Connect assigns tasks to
> workers. I see two ways to move forward:
> 1) We extend the new proposed APIs to support different resource types
> (e.g. partitions, tasks, etc.); or
> 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> similar to the new ones but different on the content/resources and
> they would rely on the same engine on the coordinator side.
>
> I personally lean towards 2) because I am not a fan of overcharging
> APIs to serve different purposes. That being said, I am not opposed to
> 1) if we can find an elegant way to do it.
>
> I think that we can continue to discuss it here for now in order to
> ensure that this KIP is compatible with what we will do for Connect in
> the future.
>
> Best,
> David
>
> On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> >
> > Hi all,
> >
> > I am back from vacation. I will go through and address your comments
> > in the coming days. Thanks for your feedback.
> >
> > Cheers,
> > David
> >
> > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> wrote:
> > >
> > > Hey All!
> > >
> > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> making it
> > > down the stack!
> > >
> > > I had a few questions:
> > >
> > > 1. The 'Rejected Alternatives' section describes how member epoch
> should
> > > advance in step with the group epoch and assignment epoch values. I
> think
> > > that this is a good idea for the reasons described in the KIP. When the
> > > protocol is incrementally assigning partitions to a worker, what member
> > > epoch does each incremental assignment use? Are member epochs re-used,
> and
> > > a single member epoch can correspond to multiple different
> (monotonically
> > > larger) assignments?
> > >
> > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > > not, should custom client-side assignor implementations interact with
> the
> > > Reason field, and how is its common meaning agreed upon? If so, what
> is the
> > > benefit of a distinct Reason field over including such functionality
> in the
> > > opaque metadata?
> > >
> > > 3. The following is included in the KIP: "Thanks to this, the input of
> the
> > > client side assignor is entirely driven by the group coordinator. The
> > > consumer is no longer responsible for maintaining any state besides its
> > > assigned partitions." Does this mean that the client-side assignor MAY
> > > incorporate additional non-Metadata state (such as partition
> throughput,
> > > cpu/memory metrics, config topics, etc), or that additional
> non-Metadata
> > > state SHOULD NOT be used?
> > >
> > > 4. I see that there are separate classes
> > > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > > overlap significantly. Is it possible for these two implementations to
> be
> > > unified? This would serve to promote feature parity of server-side and
> > > client-side assignors, and would also facilitate operational
> flexibility in
> > > certain situations. For example, if a server-side assignor has some
> poor
> > > behavior and needs a patch, deploying the patched assignor to the
> client
> > > and switching one consumer group to a client-side assignor may be
> faster
> > > and less risky than patching all of the brokers. With the currently
> > > proposed distinct APIs, a non-trivial reimplementation would have to be
> > > assembled, and if the two APIs have diverged significantly, then it is
> > > possible that a reimplementation would not be possible.
> > >
> > > --
> > > Greg Harris
> > > gharris1...@gmail.com
> > > github.com/gharris1727
> > >
> > > On Wed, Aug 3, 2022 at 8:39 AM Sagar 
> wrote:
> > >
> > > > Hi 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-08-12 Thread Sagar
Hey John,

Thanks for the vote. I added the reason for the rejection of the
alternatives. The first one is basically an option to broadcast to all
partitions which I felt was restrictive. Instead the KIP allows
multicasting to 0-N partitions based upon the partitioner implementation.

Thanks!
Sagar.

On Sat, Aug 13, 2022 at 7:35 AM John Roesler  wrote:

> Thanks, Sagar!
>
> I’m +1 (binding)
>
> Can you add a short explanation to each rejected alternative? I was
> wondering why we wouldn’t provide an overloaded to()/addSink() (the first
> rejected alternative), and I had to look back at the Streams code to see
> that they both already accept the partitioner (I thought it was a config).
>
> Thanks!
> -John
>
> On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > +1 (non binding)
> >
> > Walker
> >
> > On Tue, May 31, 2022 at 4:44 AM Sagar  wrote:
> >
> >> Hi All,
> >>
> >> I would like to start a voting thread on
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >> .
> >>
> >> I am just starting this as the discussion thread has been open for 10+
> >> days. In case there are some comments, we can always discuss them over
> >> there.
> >>
> >> Thanks!
> >> Sagar.
> >>
>


Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-17 Thread Sagar
Hi Alex,

I went through the KIP again and it looks good to me. I just had a question
on the secondary state stores:

*All writes and deletes go to the temporary store. Reads query the
temporary store; if the data is missing, query the regular store. Range
reads query both stores and return a KeyValueIterator that merges the
results. On crash
failure, ProcessorStateManager calls StateStore#recover(offset) that
truncates the temporary store.*

I guess the reads go to the temp store today as well, the state stores read
can fetch uncommitted data? Also, if the query is for a recent(what is
recent is also debatable TBH). write, then it makes sense to go to the
secondary store. But, if not, i.e if it's a read to a key which is
committed, it would always be a miss and we would end up making 2 reads. I
don't know if it's a performance bottleneck, but is it possible to figure
out beforehand and query only the relevant store? If you think it's a case
of over optimisation, then you can ignore it.

I think we can do something similar to range queries as well and  look to
merge the results from secondary and primary stores only if there is an
overlap.

Also, staying on this, I still wanted to ask that in Kafka Streams, from my
limited knowledge, if EOS is enabled, it would return only committed data.
So, I'm still curious about the choice of going to the secondary store
first. Maybe there's something fundamental that I am missing.

Thanks for the KIP again!

Thanks!
Sagar.



On Mon, Aug 15, 2022 at 8:25 PM Alexander Sorokoumov
 wrote:

> Hey Guozhang,
>
> Thank you for elaborating! I like your idea to introduce a StreamsConfig
> specifically for the default store APIs. You mentioned Materialized, but I
> think changes in StreamJoined follow the same logic.
>
> I updated the KIP and the prototype according to your suggestions:
> * Add a new StoreType and a StreamsConfig for transactional RocksDB.
> * Decide whether Materialized/StreamJoined are transactional based on the
> configured StoreType.
> * Move RocksDBTransactionalMechanism to
> org.apache.kafka.streams.state.internals to remove it from the proposal
> scope.
> * Add a flag in new Stores methods to configure a state store as
> transactional. Transactional state stores use the default transactional
> mechanism.
> * The changes above allowed to remove all changes to the StoreSupplier
> interface.
>
> I am not sure about marking StateStore#transactional() as evolving. As long
> as we allow custom user implementations of that interface, we should
> probably either keep that flag to distinguish between transactional and
> non-transactional implementations or change the contract behind the
> interface. What do you think?
>
> Best,
> Alex
>
> On Thu, Aug 11, 2022 at 1:00 AM Guozhang Wang  wrote:
>
> > Hello Alex,
> >
> > Thanks for the replies. Regarding the global config v.s. per-store spec,
> I
> > agree with John's early comments to some degrees, but I think we may well
> > distinguish a couple scenarios here. In sum we are discussing about the
> > following levels of per-store spec:
> >
> > * Materialized#transactional()
> > * StoreSupplier#transactional()
> > * StateStore#transactional()
> > * Stores.persistentTransactionalKeyValueStore()...
> >
> > And my thoughts are the following:
> >
> > * In the current proposal users could specify transactional as either
> > "Materialized.as("storeName").withTransantionsEnabled()" or
> > "Materialized.as(Stores.persistentTransactionalKeyValueStore(..))", which
> > seems not necessary to me. In general, the more options the library
> > provides, the messier for users to learn the new APIs.
> >
> > * When using built-in stores, users would usually go with
> > Materialized.as("storeName"). In such cases I feel it's not very
> meaningful
> > to specify "some of the built-in stores to be transactional, while others
> > be non transactional": as long as one of your stores are
> non-transactional,
> > you'd still pay for large restoration cost upon unclean failure. People
> > may, indeed, want to specify if different transactional mechanisms to be
> > used across stores; but for whether or not the stores should be
> > transactional, I feel it's really an "all or none" answer, and our
> built-in
> > form (rocksDB) should support transactionality for all store types.
> >
> > * When using customized stores, users would usually go with
> > Materialized.as(StoreSupplier). And it's possible if users would choose
> > some to be transactional while others non-transactional (e.g. if their
> > customized store only 

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-17 Thread Sagar
Hi Alex,

Thanks for your response. Yeah I kind of mixed the EOS behaviour with state
stores. Thanks for clarifying.

I think what you are suggesting wrt querying secondary stores makes sense.
What I had imagined was trying to leverage the fact that the keys are
sorted in a known order. So, before querying we should be able to figure
out which store to hit. But that seems to be a case of over optimisation
upfront. Either way, bloom filters should be able to help us out as you
pointed out.

Thanks!
Sagar.


On Wed, Aug 17, 2022 at 6:05 PM Alexander Sorokoumov
 wrote:

> Hey Sagar,
>
> I'll start from the end.
>
> if EOS is enabled, it would return only committed data.
>
> I think you might refer to Kafka's consumer isolation levels. To my
> knowledge, they only work for consuming data from a topic. For example,
> READ_COMMITTED prevents reading data from an ongoing Kafka transaction.
> Because of these isolation levels, we can ensure EOS for stateful tasks by
> wiping local state stores and replaying only committed entries from the
> changelog topic. However, we do have to wipe the local stores now because
> there is no way to distinguish between committed and uncommitted entries;
> therefore, these isolation levels do not affect reads from the state stores
> during regular operation. This is why currently reads from RocksDB or
> in-memory stores always return the latest write. By deprecating
> StateStore#flush and introducing StateStore#commit/StateStore#recover, this
> proposal adds a way to adopt these isolation levels in the future, say, for
> interactive queries.
>
>
> I don't know if it's a performance bottleneck, but is it possible to figure
> > out beforehand and query only the relevant store?
>
>
> You are correct that the secondary store implementation introduces
> performance overhead of an additional read from the uncommitted store. My
> reasoning about it is the following. The worst case for performance here is
> when a key is available only in the main store because we'd have to check
> the uncommitted store first. If the key is present in the uncommitted
> store, we can return it right away without checking the main store. There
> are 2 situations to consider for the worst case scenario where we do the
> unnecessary read from the uncommitted store:
>
>1. If the uncommitted data set fits into memory (this should be the most
>common case in practice), then the extra read from the uncommitted
> store is
>as cheap as copying key bytes off JVM plus RocksDB in-memory read.
>2. If the uncommitted data set does not fit into memory, RocksDB already
>implements Bloom Filters that filter out SSTs that definitely do not
>contain the key.
>
> In principle, we can implement an additional Bloom Filter on top of the
> uncommitted RocksDB, but it will only save us the JNI call overhead. I
> might do that if the performance overhead of that extra read turns out to
> be significant.
>
> The range queries follow similar logic. I implemented the merge by creating
> range iterators for both stores and peeking at the next available key after
> consuming the previous one. In that case, the unnecessary overhead that
> comes from the secondary store is a JNI call to the uncommitted store for a
> non-existing range that should be cheap relative to the overall cost of a
> range query.
>
> Best,
> Alex
>
> On Wed, Aug 17, 2022 at 12:37 PM Sagar  wrote:
>
> > Hi Alex,
> >
> > I went through the KIP again and it looks good to me. I just had a
> question
> > on the secondary state stores:
> >
> > *All writes and deletes go to the temporary store. Reads query the
> > temporary store; if the data is missing, query the regular store. Range
> > reads query both stores and return a KeyValueIterator that merges the
> > results. On crash
> > failure, ProcessorStateManager calls StateStore#recover(offset) that
> > truncates the temporary store.*
> >
> > I guess the reads go to the temp store today as well, the state stores
> read
> > can fetch uncommitted data? Also, if the query is for a recent(what is
> > recent is also debatable TBH). write, then it makes sense to go to the
> > secondary store. But, if not, i.e if it's a read to a key which is
> > committed, it would always be a miss and we would end up making 2 reads.
> I
> > don't know if it's a performance bottleneck, but is it possible to figure
> > out beforehand and query only the relevant store? If you think it's a
> case
> > of over optimisation, then you can ignore it.
> >
> > I think we can do something similar to range queries as well and  look to
> > merge the result

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-08-18 Thread Sagar
Hello Sophie,

Thanks for your feedback. I have made all the suggested changes.

One note, on how users can accomplish this in today's world , I have made
up this example and have never tried myself before. But I am assuming it
will work.

Let me know what you think.

Thanks!
Sagar.


On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
 wrote:

> Hey Sagar, thanks for the KIP!
>
> Just some cosmetic points to make it absolutely clear what this KIP is
> doing:
> 1) could you clarify up front in the Motivation section that this is
> focused on Kafka Streams applications, and not the plain Producer client?
> 2) you included the entire implementation of the `#send` method to
> demonstrate the change in logic, but can you either remove the parts of
> the implementation that aren't being touched here or at least highlight in
> some way the specific lines that have changed?
> 3) In general the implementation is, well, an implementation detail that
> doesn't need to be included in the KIP, but it's ok -- always nice to get a
> sense of how things will work internally. But what I think would be more
> useful to show in the KIP is how things will work with the new public
> interface -- ie, can you provide a brief example of how a user would go
> about taking advantage of this new interface? Even better, include an
> example of what it takes for a user to accomplish this behavior before this
> KIP. It would help showcase the concrete benefit this KIP is bringing and
> anchor the motivation section a bit better.
>
> Also nit: in the 2nd sentence under Public Interfaces I think it should say
> "it would invoke the partition()  method" -- ie this should be
> "partition()" not "partition*s*()"
>
> Other than that this looks good, but I'll wait until you've addressed the
> above to cast a vote.
>
> Thanks!
> Sophie
>
> On Fri, Aug 12, 2022 at 10:36 PM Sagar  wrote:
>
> > Hey John,
> >
> > Thanks for the vote. I added the reason for the rejection of the
> > alternatives. The first one is basically an option to broadcast to all
> > partitions which I felt was restrictive. Instead the KIP allows
> > multicasting to 0-N partitions based upon the partitioner implementation.
> >
> > Thanks!
> > Sagar.
> >
> > On Sat, Aug 13, 2022 at 7:35 AM John Roesler 
> wrote:
> >
> > > Thanks, Sagar!
> > >
> > > I’m +1 (binding)
> > >
> > > Can you add a short explanation to each rejected alternative? I was
> > > wondering why we wouldn’t provide an overloaded to()/addSink() (the
> first
> > > rejected alternative), and I had to look back at the Streams code to
> see
> > > that they both already accept the partitioner (I thought it was a
> > config).
> > >
> > > Thanks!
> > > -John
> > >
> > > On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > > > +1 (non binding)
> > > >
> > > > Walker
> > > >
> > > > On Tue, May 31, 2022 at 4:44 AM Sagar 
> > wrote:
> > > >
> > > >> Hi All,
> > > >>
> > > >> I would like to start a voting thread on
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> > > >> .
> > > >>
> > > >> I am just starting this as the discussion thread has been open for
> 10+
> > > >> days. In case there are some comments, we can always discuss them
> over
> > > >> there.
> > > >>
> > > >> Thanks!
> > > >> Sagar.
> > > >>
> > >
> >
>


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-08-19 Thread Sagar
Thanks Sophie for the review. I see the confusion. As you pointed out, the
problem the KIP is trying to solve is not the avoidance of a custom
partitioner. Instead the process of sending or replicating the message N
times and then having the record wired through via a new custom partitioner
for every replication. That's what I tried to convey in the motivation
section. I updated the motivation slightly, let me know if that sounds ok.

Also, yes the dropping of records using a custom partitioner is an added
benefit that we get. I think the custom partitioner bit is important as one
can always filter the records out initially.

Let me know if this looks ok?

Thanks!
Sagar.

On Fri, Aug 19, 2022 at 10:17 AM Sophie Blee-Goldman
 wrote:

> Thanks Sagar -- one thing I'm still confused about, and sorry to keep
> pushing on this, but the example
> you gave for how this works in today's world seems not to correspond to the
> method described in the
> text of the Motivation exception, ie
>
> Currently, if a user wants to replicate a message into N partitions, the
> > only way of doing that is to replicate the message N times and then
> plug-in
> > a custom partitioner to write the message N times into N different
> > partitions.
>
>
>
>  This seems a little cumbersome way to broadcast. Also, there seems to be
> > no way of dropping a record within the partitioner. This KIP aims to make
> > this process simpler in Kafka Streams.
>
>
> It sounds like you're saying the problem this KIP is fixing is that the
> only way to do this is by implementing
> a custom partitioner and that this is cumbersome, but that's actually
> exactly what this KIP is doing: providing
> a method od multi-casting via implementing a custom partitioner (as seen in
> the example usage you provided).
> Thanks to your examples I think I now understand better what the KIP is
> doing, and assume what's written in
> the motivation section is just a type/mistake -- can you confirm?
>
> That said, the claim about having "no way of dropping a record within the
> partitioner" does actually seem to be
> correct, that is you couldn't do it with a custom partitioner prior to this
> KIP and now you can. I would consider
> that a secondary/additional improvement that these changes provide, but
> it's not strictly speaking related to
>  multi-casting, right? (Just checking my understanding, not challenging
> anything about this)
>
> Cheers,
> Sophie
>
> On Thu, Aug 18, 2022 at 7:27 AM Sagar  wrote:
>
> > Hello Sophie,
> >
> > Thanks for your feedback. I have made all the suggested changes.
> >
> > One note, on how users can accomplish this in today's world , I have made
> > up this example and have never tried myself before. But I am assuming it
> > will work.
> >
> > Let me know what you think.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
> >  wrote:
> >
> > > Hey Sagar, thanks for the KIP!
> > >
> > > Just some cosmetic points to make it absolutely clear what this KIP is
> > > doing:
> > > 1) could you clarify up front in the Motivation section that this is
> > > focused on Kafka Streams applications, and not the plain Producer
> client?
> > > 2) you included the entire implementation of the `#send` method to
> > > demonstrate the change in logic, but can you either remove the parts of
> > > the implementation that aren't being touched here or at least highlight
> > in
> > > some way the specific lines that have changed?
> > > 3) In general the implementation is, well, an implementation detail
> that
> > > doesn't need to be included in the KIP, but it's ok -- always nice to
> > get a
> > > sense of how things will work internally. But what I think would be
> more
> > > useful to show in the KIP is how things will work with the new public
> > > interface -- ie, can you provide a brief example of how a user would go
> > > about taking advantage of this new interface? Even better, include an
> > > example of what it takes for a user to accomplish this behavior before
> > this
> > > KIP. It would help showcase the concrete benefit this KIP is bringing
> and
> > > anchor the motivation section a bit better.
> > >
> > > Also nit: in the 2nd sentence under Public Interfaces I think it should
> > say
> > > "it would invoke the partition()  method" -- ie this should be
> > > "partition()" not "partition*s*()"
> > >
> > > Oth

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-22 Thread Sagar
Hi All,

As per the suggestions from David/Guozhang above, I updated the page for
Connect to have it's own set of APIs and not extend the ones from consumer.
Plz review again.

@Luke,

Thank you. I am actually looking for review comments/thoughts/suggestions
as Connect changes are still in draft stage but since this is a wholesome
change, I am sure there might be many cases missed by me. Also, on this
discussion thread David suggested to continue discussing on this very
thread to ensure connect is compatible with whatever is done in KIP-848. I
am ok either way.

@Guozhang,

Regarding the migration plan and the link to KIP-415 that you had shared, I
had taken a look at it. From what I understood, that migration/downgrading
is from switching the assignment protocol from eager -> incremental or vice
versa. In this case, the migration is to the new rebalance protocol which
would also depend on the group coordinator and whether they support the new
rebalance protocol or not. I have tried to incorporate this aspect in the
draft page.

Thanks!
Sagar.




On Mon, Aug 22, 2022 at 1:00 PM Luke Chen  wrote:

> Hi David,
>
> Thanks for the update.
>
> Some more questions:
> 1. In Group Coordinator section, you mentioned:
> > The new group coordinator will have a state machine per
> *__consumer_offsets* partitions, where each state machine is modelled as an
> event loop. Those state machines will be executed in
> *group.coordinator.threads* threads.
>
> 1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
> dead" mentioned in Consumer Group States section, right?
> 1.2. What do you mean "each state machine is modelled as an event loop"?
> 1.3. Why do we need a state machine per *__consumer_offsets* partitions?
> Not a state machine "per consumer group" owned by a group coordinator? For
> example, if one group coordinator owns 2 consumer groups, and both exist in
> *__consumer_offsets-0*, will we have 1 state machine for it, or 2?
> 1.4. I know the "*group.coordinator.threads" *is the number of threads used
> to run the state machines. But I'm wondering if the purpose of the threads
> is only to keep the state of each consumer group (or *__consumer_offsets*
> partitions?), and no heavy computation, why should we need multi-threads
> here?
>
> 2. For the default value in the new configs:
> 2.1. The consumer session timeout, why does the default session timeout not
> locate between min (45s) and max(60s)? I thought the min/max session
> timeout is to define lower/upper bound of it, no?
>
> group.consumer.session.timeout.ms int 30s The timeout to detect client
> failures when using the consumer group protocol.
> group.consumer.min.session.timeout.ms int 45s The minimum session timeout.
> group.consumer.max.session.timeout.ms int 60s The maximum session timeout.
>
>
>
> 2.2. The default server side assignor are [range, uniform], which means
> we'll default to "range" assignor. I'd like to know why not uniform one? I
> thought usually users will choose uniform assignor (former sticky assinor)
> for better evenly distribution. Any other reason we choose range assignor
> as default?
> group.consumer.assignors List range, uniform The server side assignors.
>
>
>
>
>
>
> Thank you.
> Luke
>
>
>
>
>
>
> On Mon, Aug 22, 2022 at 2:10 PM Luke Chen  wrote:
>
> > Hi Sagar,
> >
> > I have some thoughts about Kafka Connect integrating with KIP-848, but I
> > think we should have a separate discussion thread for the Kafka Connect
> > KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1],
> > and let this discussion thread focus on consumer rebalance protocol,
> WDYT?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> >
> > Thank you.
> > Luke
> >
> >
> >
> > On Fri, Aug 12, 2022 at 9:31 PM Sagar  wrote:
> >
> >> Thank you Guozhang/David for the feedback. Looks like there's agreement
> on
> >> using separate APIs for Connect. I would revisit the doc and see what
> >> changes are to be made.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Tue, Aug 9, 2022 at 7:11 PM David Jacot  >
> >> wrote:
> >>
> >> > Hi Sagar,
> >> >
> >> > Thanks for the feedback and the document. That's really helpful. I
> >> > will take a look at it.
> >> >
> >> > Overall, it seems to me that both Connect and the Consumer could share
> >> > the same underlying "engine". The main diff

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-08-24 Thread Sagar
Thank you Bruno/Matthew for your comments.

I agree using null does seem error prone. However I think using a singleton
list of [-1] might be better in terms of usability, I am saying this
because the KIP also has a provision to return an empty list to refer to
dropping the record. So, an empty optional and an empty list have totally
different meanings which could get confusing.

Let me know what you think.

Thanks!
Sagar.


On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
 wrote:

> I also concur with this, having an Optional in the type makes it very
> clear what’s going on and better signifies an absence of value (or in this
> case the broadcast value).
>
> --
> Matthew de Detrich
> Aiven Deutschland GmbH
> Immanuelkirchstraße 26, 10405 Berlin
> Amtsgericht Charlottenburg, HRB 209739 B
>
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> m: +491603708037
> w: aiven.io e: matthew.dedetr...@aiven.io
> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >
> > 2.
> > I would prefer changing the return type of partitions() to
> > Optional> and using Optional.empty() as the broadcast
> > value. IMO, The chances that an implementation returns null due to a bug
> > is much higher than that an implementation returns an empty Optional due
> > to a bug.
>


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-08-30 Thread Sagar
Thanks Bruno/Chris,

Even I agree that might be better to keep it simple like the way Chris
suggested. I have updated the KIP accordingly. I made couple of minor
changes to the KIP:

1) One of them being the change of return type of partitions method from
List to Set. This is to ensure that in case the implementation of
StreamPartitoner is buggy and ends up returning duplicate
partition numbers, we won't have duplicates thereby not trying to send to
the same partition multiple times due to this.
2) I also added a check to send the record only to valid partition numbers
and log and drop when the partition number is invalid. This is again to
prevent errors for cases when the StreamPartitioner implementation has some
bugs (since there are no validations as such).
3) I also updated the Test Plan section based on the suggestion from Bruno.
4) I updated the default implementation of partitions method based on the
great catch from Chris!

Let me know if it looks fine now.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna  wrote:

> Hi,
>
> I am favour of discarding the sugar for broadcasting and leave the
> broadcasting to the implementation as Chris suggests. I think that is
> the cleanest option.
>
> Best,
> Bruno
>
> On 29.08.22 19:50, Chris Egerton wrote:
> > Hi all,
> >
> > I think it'd be useful to be more explicit about broadcasting to all
> topic
> > partitions rather than add implicit behavior for empty cases (empty
> > optional, empty list, etc.). The suggested enum approach would address
> that
> > nicely.
> >
> > It's also worth noting that there's no hard requirement to add sugar for
> > broadcasting to all topic partitions since the API already provides the
> > number of topic partitions available when calling a stream partitioner.
> If
> > we can't find a clean way to add this support, it might be better to
> leave
> > it out and just let people implement this themselves with a line of Java
> 8
> > streams:
> >
> >  return IntStream.range(0,
> > numPartitions).boxed().collect(Collectors.toList());
> >
> > Also worth noting that there may be a bug in the default implementation
> for
> > the new StreamPartitioner::partitions method, since it doesn't appear to
> > correctly pick up on null return values from the partition method and
> > translate them into empty lists.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Sagar,
> >>
> >> I do not see an issue with using an empty list together with an empty
> >> Optional. A non-empty Optional that contains an empty list would just
> >> say that there is no partition to which the record should be sent. If
> >> there is no partition, the record is dropped.
> >>
> >> An empty Optional might be a way to say, broadcast to all partitions.
> >>
> >> Alternatively -- to make it more explicit -- you could return an object
> >> with an enum and a list of partitions. The enum could have values SOME,
> >> ALL, and NONE. If the value is SOME, the list of partitions contains the
> >> partitions to which to send the record. If the value of the enum is ALL
> >> or NONE, the list of partitions is not used and might be even null
> >> (since in that case the list should not be used and it would be a bug to
> >> use it).
> >>
> >> Best,
> >> Bruno
> >>
> >> On 24.08.22 20:11, Sagar wrote:
> >>> Thank you Bruno/Matthew for your comments.
> >>>
> >>> I agree using null does seem error prone. However I think using a
> >> singleton
> >>> list of [-1] might be better in terms of usability, I am saying this
> >>> because the KIP also has a provision to return an empty list to refer
> to
> >>> dropping the record. So, an empty optional and an empty list have
> totally
> >>> different meanings which could get confusing.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>>
> >>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>>  wrote:
> >>>
> >>>> I also concur with this, having an Optional in the type makes it very
> >>>> clear what’s going on and better signifies an absence of value (or in
> >> this
> >>>> case the broadcast value).
> >>>>
> >>>> --
> >>>> Matthew de Detrich
> >>>> Aiven Deutschland GmbH
> >>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>
> >>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>> m: +491603708037
> >>>> w: aiven.io e: matthew.dedetr...@aiven.io
> >>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >>>>>
> >>>>> 2.
> >>>>> I would prefer changing the return type of partitions() to
> >>>>> Optional> and using Optional.empty() as the broadcast
> >>>>> value. IMO, The chances that an implementation returns null due to a
> >> bug
> >>>>> is much higher than that an implementation returns an empty Optional
> >> due
> >>>>> to a bug.
> >>>>
> >>>
> >>
> >
>


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-08-31 Thread Sagar
Thanks Bruno for the great points.

I see 2 options here =>

1) As Chris suggested, drop the support for dropping records in the
partitioner. That way, an empty list could signify the usage of a default
partitioner. Also, if the deprecated partition() method returns null
thereby signifying the default partitioner, the partitions() can return an
empty list i.e default partitioner.

2) OR we treat a null return type of partitions() method to signify the
usage of the default partitioner. In the default implementation of
partitions() method, if partition() returns null, then even partitions()
can return null(instead of an empty list). The RecordCollectorImpl code can
also be modified accordingly. @Chris, to your point, we can even drop the
support of dropping of records. It came up during KIP discussion, and I
thought it might be a useful feature. Let me know what you think.

3) Lastly about the partition number check. I wanted to avoid the throwing
of exception so I thought adding it might be a useful feature. But as you
pointed out, if it can break backwards compatibility, it's better to remove
it.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton 
wrote:

> +1 to Bruno's concerns about backward compatibility. Do we actually need
> support for dropping records in the partitioner? It doesn't seem necessary
> based on the motivation for the KIP. If we remove that feature, we could
> handle null and/or empty lists by using the default partitioning,
> equivalent to how we handle null return values from the existing partition
> method today.
>
> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna  wrote:
>
> > Hi Sagar,
> >
> > Thank you for the updates!
> >
> > I do not intend to prolong this vote thread more than needed, but I
> > still have some points.
> >
> > The deprecated partition method can return null if the default
> > partitioning logic of the producer should be used.
> > With the new method partitions() it seems that it is not possible to use
> > the default partitioning logic, anymore.
> >
> > Also, in the default implementation of method partitions(), a record
> > that would use the default partitioning logic in method partition()
> > would be dropped, which would break backward compatibility since Streams
> > would always call the new method partitions() even though the users
> > still implement the deprecated method partition().
> >
> > I have a last point that we should probably discuss on the PR and not on
> > the KIP but since you added the code in the KIP I need to mention it. I
> > do not think you should check the validity of the partition number since
> > the ProducerRecord does the same check and throws an exception. If
> > Streams adds the same check but does not throw, the behavior is not
> > backward compatible.
> >
> > Best,
> > Bruno
> >
> >
> > On 30.08.22 12:43, Sagar wrote:
> > > Thanks Bruno/Chris,
> > >
> > > Even I agree that might be better to keep it simple like the way Chris
> > > suggested. I have updated the KIP accordingly. I made couple of minor
> > > changes to the KIP:
> > >
> > > 1) One of them being the change of return type of partitions method
> from
> > > List to Set. This is to ensure that in case the implementation of
> > > StreamPartitoner is buggy and ends up returning duplicate
> > > partition numbers, we won't have duplicates thereby not trying to send
> to
> > > the same partition multiple times due to this.
> > > 2) I also added a check to send the record only to valid partition
> > numbers
> > > and log and drop when the partition number is invalid. This is again to
> > > prevent errors for cases when the StreamPartitioner implementation has
> > some
> > > bugs (since there are no validations as such).
> > > 3) I also updated the Test Plan section based on the suggestion from
> > Bruno.
> > > 4) I updated the default implementation of partitions method based on
> the
> > > great catch from Chris!
> > >
> > > Let me know if it looks fine now.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna 
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I am favour of discarding the sugar for broadcasting and leave the
> > >> broadcasting to the implementation as Chris suggests. I think that is
> > >> the cleanest option.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On 29.08.22 19:50, Chris Egerton wrot

Re: [DISCUSS] KIP-864: Add End-To-End Latency Metrics to Connectors

2022-09-01 Thread Sagar
Hi Jorge,

Thanks for the KIP. It looks like a very good addition. I skimmed through
once and had a couple of questions =>

1) I am assuming the new metrics would be task level metric. Could you
specify the way it's done for other sink/source connector?
2) I am slightly confused about the e2e latency metric. Let's consider the
sink connector metric. If I look at the way it's supposed to be calculated,
i.e the difference between the record timestamp and the wall clock time, it
looks like a per record metric. However, the put-batch time measures the
time to put a batch of records to external sink. So, I would assume the 2
can't be added as is to compute the e2e latency. Maybe I am missing
something here. Could you plz clarify this.

Thanks!
Sagar.

On Tue, Aug 30, 2022 at 8:43 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi all,
>
> I'd like to start a discussion thread on KIP-864: Add End-To-End Latency
> Metrics to Connectors.
> This KIP aims to improve the metrics available on Source and Sink
> Connectors to measure end-to-end latency, including source and sink record
> conversion time, and sink record e2e latency (similar to KIP-613 for
> Streams).
>
> The KIP is here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors
>
> Please take a look and let me know what you think.
>
> Cheers,
> Jorge.
>


Re: [DISCUSS] KIP-864: Add End-To-End Latency Metrics to Connectors

2022-09-02 Thread Sagar
Hi Jorge,

Thanks for the changes.

Regarding the metrics, I meant something like this:
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"

the way it's defined in
https://kafka.apache.org/documentation/#connect_monitoring for the metrics.

I see what you mean by the 3 metrics and how it can be interpreted. The
only thing I would argue is do we need sink-record-latency-min? Maybe we
could remove this min metric as well and make all of the 3 e2e metrics
consistent(since put-batch also doesn't expose a min which makes sense to
me). I think this is in contrast to what Yash pointed out above so I would
like to hear his thoughts as well.

The other point Yash mentioned about the slightly flawed definition of e2e
is also true in a sense. But I have a feeling that's one the records are
polled by the connector tasks, it would be difficult to track the final leg
via the framework. Probably users can track the metrics at their end to
figure that out. Do you think that makes sense?

Thanks!
Sagar.




On Thu, Sep 1, 2022 at 11:40 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Sagar and Yash,
>
> Thanks for your feedback!
>
> > 1) I am assuming the new metrics would be task level metric.
>
> 1.1 Yes, it will be a task level metric, implemented on the
> Worker[Source/Sink]Task.
>
> > Could you specify the way it's done for other sink/source connector?
>
> 1.2. Not sure what do you mean by this. Could you elaborate a bit more?
>
> > 2. I am slightly confused about the e2e latency metric...
>
> 2.1. Yes, I see. I was trying to bring a similar concept as in Streams with
> KIP-613, though the e2e concept may not be translatable.
> We could keep it as `sink-record-latency` to avoid conflating concepts. A
> similar metric naming was proposed in KIP-489 but at the consumer level —
> though it seems dormant for a couple of years.
>
> > However, the put-batch time measures the
> > time to put a batch of records to external sink. So, I would assume the 2
> > can't be added as is to compute the e2e latency. Maybe I am missing
> > something here. Could you plz clarify this.
>
> 2.2. Yes, agree. Not necessarily added, but with the 3 latencies (poll,
> convert, putBatch) will be clearer where the bottleneck may be, and
> represent the internal processing.
>
> > however, as per the KIP it looks like it will be
> > the latency between when the record was written to Kafka and when the
> > record is returned by a sink task's consumer's poll?
>
> 3.1. Agree. 2.1. could help to clarify this.
>
> > One more thing - I was wondering
> > if there's a particular reason for having a min metric for e2e latency
> but
> > not for convert time?
>
> 3.2. Was following KIP-613 for e2e which seems useful to compare with Max a
> get an idea of the window of results, though current latencies in Connector
> do not include Min, and that's why I haven't added it for convert latency.
> Do you think it make sense to extend latency metrics with Min?
>
> KIP is updated to clarify some of these changes.
>
> Many thanks,
> Jorge.
>
> On Thu, 1 Sept 2022 at 18:11, Yash Mayya  wrote:
>
> > Hi Jorge,
> >
> > Thanks for the KIP! I have the same confusion with the e2e-latency
> metrics
> > as Sagar above. "e2e" would seem to indicate the latency between when the
> > record was written to Kafka and when the record was written to the sink
> > system by the connector - however, as per the KIP it looks like it will
> be
> > the latency between when the record was written to Kafka and when the
> > record is returned by a sink task's consumer's poll? I think that metric
> > will be a little confusing to interpret. One more thing - I was wondering
> > if there's a particular reason for having a min metric for e2e latency
> but
> > not for convert time?
> >
> > Thanks,
> > Yash
> >
> > On Thu, Sep 1, 2022 at 8:59 PM Sagar  wrote:
> >
> > > Hi Jorge,
> > >
> > > Thanks for the KIP. It looks like a very good addition. I skimmed
> through
> > > once and had a couple of questions =>
> > >
> > > 1) I am assuming the new metrics would be task level metric. Could you
> > > specify the way it's done for other sink/source connector?
> > > 2) I am slightly confused about the e2e latency metric. Let's consider
> > the
> > > sink connector metric. If I look at the way it's supposed to be
> > calculated,
> > > i.e the difference between the record timestamp and the wall clock
> time,
> > it
> &

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-09-02 Thread Sagar
Hello Bruno/Chris,

Since these are the last set of changes(I am assuming haha), it would be
great if you could review the 2 options from above so that we can close the
voting. Of course I am happy to incorporate any other requisite changes.

Thanks!
Sagar.

On Wed, Aug 31, 2022 at 10:07 PM Sagar  wrote:

> Thanks Bruno for the great points.
>
> I see 2 options here =>
>
> 1) As Chris suggested, drop the support for dropping records in the
> partitioner. That way, an empty list could signify the usage of a default
> partitioner. Also, if the deprecated partition() method returns null
> thereby signifying the default partitioner, the partitions() can return an
> empty list i.e default partitioner.
>
> 2) OR we treat a null return type of partitions() method to signify the
> usage of the default partitioner. In the default implementation of
> partitions() method, if partition() returns null, then even partitions()
> can return null(instead of an empty list). The RecordCollectorImpl code can
> also be modified accordingly. @Chris, to your point, we can even drop the
> support of dropping of records. It came up during KIP discussion, and I
> thought it might be a useful feature. Let me know what you think.
>
> 3) Lastly about the partition number check. I wanted to avoid the throwing
> of exception so I thought adding it might be a useful feature. But as you
> pointed out, if it can break backwards compatibility, it's better to remove
> it.
>
> Thanks!
> Sagar.
>
>
> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton 
> wrote:
>
>> +1 to Bruno's concerns about backward compatibility. Do we actually need
>> support for dropping records in the partitioner? It doesn't seem necessary
>> based on the motivation for the KIP. If we remove that feature, we could
>> handle null and/or empty lists by using the default partitioning,
>> equivalent to how we handle null return values from the existing partition
>> method today.
>>
>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna  wrote:
>>
>> > Hi Sagar,
>> >
>> > Thank you for the updates!
>> >
>> > I do not intend to prolong this vote thread more than needed, but I
>> > still have some points.
>> >
>> > The deprecated partition method can return null if the default
>> > partitioning logic of the producer should be used.
>> > With the new method partitions() it seems that it is not possible to use
>> > the default partitioning logic, anymore.
>> >
>> > Also, in the default implementation of method partitions(), a record
>> > that would use the default partitioning logic in method partition()
>> > would be dropped, which would break backward compatibility since Streams
>> > would always call the new method partitions() even though the users
>> > still implement the deprecated method partition().
>> >
>> > I have a last point that we should probably discuss on the PR and not on
>> > the KIP but since you added the code in the KIP I need to mention it. I
>> > do not think you should check the validity of the partition number since
>> > the ProducerRecord does the same check and throws an exception. If
>> > Streams adds the same check but does not throw, the behavior is not
>> > backward compatible.
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> > On 30.08.22 12:43, Sagar wrote:
>> > > Thanks Bruno/Chris,
>> > >
>> > > Even I agree that might be better to keep it simple like the way Chris
>> > > suggested. I have updated the KIP accordingly. I made couple of minor
>> > > changes to the KIP:
>> > >
>> > > 1) One of them being the change of return type of partitions method
>> from
>> > > List to Set. This is to ensure that in case the implementation of
>> > > StreamPartitoner is buggy and ends up returning duplicate
>> > > partition numbers, we won't have duplicates thereby not trying to
>> send to
>> > > the same partition multiple times due to this.
>> > > 2) I also added a check to send the record only to valid partition
>> > numbers
>> > > and log and drop when the partition number is invalid. This is again
>> to
>> > > prevent errors for cases when the StreamPartitioner implementation has
>> > some
>> > > bugs (since there are no validations as such).
>> > > 3) I also updated the Test Plan section based on the suggestion from
>> > Bruno.
>> > > 4) I updated the default implementation of partitions method b

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-09-10 Thread Sagar
Hi Bruno,

Thanks, I think these changes make sense to me. I have updated the KIP
accordingly.

Thanks!
Sagar.

On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna  wrote:

> Hi Sagar,
>
> I would not drop the support for dropping records. I would also not
> return null from partitions(). Maybe an Optional can help here. An empty
> Optional would mean to use the default partitioning behavior of the
> producer. So we would have:
>
> - non-empty Optional, non-empty list of integers: partitions to send the
> record to
> - non-empty Optional, empty list of integers: drop the record
> - empty Optional: use default behavior
>
> What do other think?
>
> Best,
> Bruno
>
> On 02.09.22 13:53, Sagar wrote:
> > Hello Bruno/Chris,
> >
> > Since these are the last set of changes(I am assuming haha), it would be
> > great if you could review the 2 options from above so that we can close
> the
> > voting. Of course I am happy to incorporate any other requisite changes.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, Aug 31, 2022 at 10:07 PM Sagar 
> wrote:
> >
> >> Thanks Bruno for the great points.
> >>
> >> I see 2 options here =>
> >>
> >> 1) As Chris suggested, drop the support for dropping records in the
> >> partitioner. That way, an empty list could signify the usage of a
> default
> >> partitioner. Also, if the deprecated partition() method returns null
> >> thereby signifying the default partitioner, the partitions() can return
> an
> >> empty list i.e default partitioner.
> >>
> >> 2) OR we treat a null return type of partitions() method to signify the
> >> usage of the default partitioner. In the default implementation of
> >> partitions() method, if partition() returns null, then even partitions()
> >> can return null(instead of an empty list). The RecordCollectorImpl code
> can
> >> also be modified accordingly. @Chris, to your point, we can even drop
> the
> >> support of dropping of records. It came up during KIP discussion, and I
> >> thought it might be a useful feature. Let me know what you think.
> >>
> >> 3) Lastly about the partition number check. I wanted to avoid the
> throwing
> >> of exception so I thought adding it might be a useful feature. But as
> you
> >> pointed out, if it can break backwards compatibility, it's better to
> remove
> >> it.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >>
> >> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton 
> >> wrote:
> >>
> >>> +1 to Bruno's concerns about backward compatibility. Do we actually
> need
> >>> support for dropping records in the partitioner? It doesn't seem
> necessary
> >>> based on the motivation for the KIP. If we remove that feature, we
> could
> >>> handle null and/or empty lists by using the default partitioning,
> >>> equivalent to how we handle null return values from the existing
> partition
> >>> method today.
> >>>
> >>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna 
> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thank you for the updates!
> >>>>
> >>>> I do not intend to prolong this vote thread more than needed, but I
> >>>> still have some points.
> >>>>
> >>>> The deprecated partition method can return null if the default
> >>>> partitioning logic of the producer should be used.
> >>>> With the new method partitions() it seems that it is not possible to
> use
> >>>> the default partitioning logic, anymore.
> >>>>
> >>>> Also, in the default implementation of method partitions(), a record
> >>>> that would use the default partitioning logic in method partition()
> >>>> would be dropped, which would break backward compatibility since
> Streams
> >>>> would always call the new method partitions() even though the users
> >>>> still implement the deprecated method partition().
> >>>>
> >>>> I have a last point that we should probably discuss on the PR and not
> on
> >>>> the KIP but since you added the code in the KIP I need to mention it.
> I
> >>>> do not think you should check the validity of the partition number
> since
> >>>> the ProducerRecord does the same check and throws an exception. If
> >>>> Streams adds the same check but does not

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-09-12 Thread Sagar
Thanks Bruno,

Marking this as accepted.

Thanks everyone for their comments/feedback.

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna  wrote:

> Hi Sagar,
>
> Thanks for the update and the PR!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 10.09.22 18:57, Sagar wrote:
> > Hi Bruno,
> >
> > Thanks, I think these changes make sense to me. I have updated the KIP
> > accordingly.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna  wrote:
> >
> >> Hi Sagar,
> >>
> >> I would not drop the support for dropping records. I would also not
> >> return null from partitions(). Maybe an Optional can help here. An empty
> >> Optional would mean to use the default partitioning behavior of the
> >> producer. So we would have:
> >>
> >> - non-empty Optional, non-empty list of integers: partitions to send the
> >> record to
> >> - non-empty Optional, empty list of integers: drop the record
> >> - empty Optional: use default behavior
> >>
> >> What do other think?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 02.09.22 13:53, Sagar wrote:
> >>> Hello Bruno/Chris,
> >>>
> >>> Since these are the last set of changes(I am assuming haha), it would
> be
> >>> great if you could review the 2 options from above so that we can close
> >> the
> >>> voting. Of course I am happy to incorporate any other requisite
> changes.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar 
> >> wrote:
> >>>
> >>>> Thanks Bruno for the great points.
> >>>>
> >>>> I see 2 options here =>
> >>>>
> >>>> 1) As Chris suggested, drop the support for dropping records in the
> >>>> partitioner. That way, an empty list could signify the usage of a
> >> default
> >>>> partitioner. Also, if the deprecated partition() method returns null
> >>>> thereby signifying the default partitioner, the partitions() can
> return
> >> an
> >>>> empty list i.e default partitioner.
> >>>>
> >>>> 2) OR we treat a null return type of partitions() method to signify
> the
> >>>> usage of the default partitioner. In the default implementation of
> >>>> partitions() method, if partition() returns null, then even
> partitions()
> >>>> can return null(instead of an empty list). The RecordCollectorImpl
> code
> >> can
> >>>> also be modified accordingly. @Chris, to your point, we can even drop
> >> the
> >>>> support of dropping of records. It came up during KIP discussion, and
> I
> >>>> thought it might be a useful feature. Let me know what you think.
> >>>>
> >>>> 3) Lastly about the partition number check. I wanted to avoid the
> >> throwing
> >>>> of exception so I thought adding it might be a useful feature. But as
> >> you
> >>>> pointed out, if it can break backwards compatibility, it's better to
> >> remove
> >>>> it.
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>>
> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton  >
> >>>> wrote:
> >>>>
> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
> >> need
> >>>>> support for dropping records in the partitioner? It doesn't seem
> >> necessary
> >>>>> based on the motivation for the KIP. If we remove that feature, we
> >> could
> >>>>> handle null and/or empty lists by using the default partitioning,
> >>>>> equivalent to how we handle null return values from the existing
> >> partition
> >>>>> method today.
> >>>>>
> >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna 
> >> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> Thank you for the updates!
> >>>>>>
> >>>>>> I do not intend to prolong this vote thread more than needed, but I
> >>>>>> still have some points.
> >>>>>>
> >>>>>> The deprecated partition method can return null if 

Re: [ANNOUNCE] New committer: Deng Ziming

2022-10-10 Thread Sagar
Congratulations ziming!

On Tue, 11 Oct 2022 at 4:16 AM, Bill Bejeck  wrote:

> Congrats Ziming!
>
> Regards,
> Bill
>
> On Mon, Oct 10, 2022 at 5:32 PM Ismael Juma  wrote:
>
> > Congratulations Ziming!
> >
> > Ismael
> >
> > On Mon, Oct 10, 2022 at 9:30 AM Jason Gustafson
>  > >
> > wrote:
> >
> > > Hi All
> > >
> > > The PMC for Apache Kafka has invited Deng Ziming to become a committer,
> > > and we are excited to announce that he has accepted!
> > >
> > > Ziming has been contributing to Kafka for about three years. He has
> > > authored
> > > more than 100 patches and helped to review nearly as many. In
> particular,
> > > he made significant contributions to the KRaft project which had a big
> > part
> > > in reaching our production readiness goal in the 3.3 release:
> > > https://blogs.apache.org/kafka/entry/what-rsquo-s-new-in.
> > >
> > > Please join me in congratulating Ziming! Thanks for all of your
> > > contributions!
> > >
> > > -- Jason, on behalf of the Apache Kafka PMC
> > >
> >
>


Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-27 Thread Sagar
Hey Sophie,

This looks like a very nice feature. Going through the comments, I agree
with Bill above that there could be a case for skew on keys given the
earlier partitions would have the data which it already had and get some
more. Do you think that's a concern/side-effect that this feature could
bring in?

Thanks!
Sagar.

On Wed, Oct 26, 2022 at 2:15 AM Walker Carlson
 wrote:

> Hey Sophie,
>
> Thanks for the KIP. I think this could be useful for a lot of cases. I also
> think that this could cause a lot of confusion.
>
> Just to make sure we are doing our best to prevent people from
> misusing this feature, I wanted to clarify a couple of things.
> 1) There will be only an interface and no "default" implementation that a
> user can plug in for the static partitioner. I am considering when it comes
> to testing we want to make sure that we do not make our testing
> implementation avaible to a user.
> 2)  If a user wanted to use auto scaling for a stateless application it
> should be as easy as implementing the StaticStreamsPartitioner. Their
> implementation could even just wrap the default partitioner if they wanted,
> right?  I can't think of any way we could detect and then warn them about
> the output topic not being partitioned by keys if that were to happen, can
> you?
>
> Overall this looks good to me!
>
> Walker
>
> On Tue, Oct 25, 2022 at 12:27 PM Bill Bejeck  wrote:
>
> > Hi Sophie,
> >
> > Thanks for the KIP! I think this is a worthwhile feature to add.  I have
> > two main questions about how this new feature will work.
> >
> >
> >1. You mention that for stateless applications auto-scaling is a
> sticker
> >situation.  But I was thinking that the auto-scaling would actually
> > benefit
> >stateless applications the most, let me explain my thinking.  Let's
> say
> > you
> >have a stateless Kafka Streams application with one input topic and 2
> >partitions, meaning you're limited to at most 2 stream threads.  In
> > order
> >to increase the throughput, you increase the number of partitions of
> the
> >source topic to 4, so you can 4 stream threads.  In this case would
> the
> >auto-scaling feature automatically increase the number of tasks from 2
> > to
> >4?  Since the application is stateless, say using a filter then a map
> > for
> >example, the partition for the record doesn't matter, so it seems that
> >stateless applications would stand to gain a great deal.
> >2. For stateful applications I can see the immediate benefit from
> >autoscaling and static partitioning.   But again going with a
> partition
> >expansion for increased throughput example, what would be the
> mitigation
> >strategy for a stateful application that eventually wants to take
> > advantage
> >of the increased number of partitions? Otherwise keeping all keys on
> > their
> >original partition means you could end up with "key skew" due to not
> >allowing keys to distribute out to the new partitions.
> >
> > One last comment, the KIP states "only the key, rather than the key and
> > value, are passed in to the partitioner", but the interface has it
> taking a
> > key and a value as parameters.  Based on your comments earlier in this
> > thread I was thinking that the text needs to be updated.
> >
> > Thanks,
> > Bill
> >
> > On Fri, Oct 21, 2022 at 12:21 PM Lucas Brutschy
> >  wrote:
> >
> > > Hi all,
> > >
> > > thanks, Sophie, this makes sense. I suppose then the way to help the
> user
> > > not apply this in the wrong setting is having good documentation and a
> > one
> > > or two examples of good use cases.
> > >
> > > I think Colt's time-based partitioning is a good example of how to use
> > > this. It actually doesn't have to be time, the same will work with any
> > > monotonically increasing identifier. I.e. the new partitions will only
> > get
> > > records for users with a "large" user ID greater than some user ID
> > > threshold hardcoded in the static partitioner. At least in this
> > restricted
> > > use-case, lookups by user ID would still be possible.
> > >
> > > Cheers,
> > > Lucas
> > >
> > > On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy 
> > wrote:
> > >
> > > > Sophie,
> > > >
> > > > Regarding item "3" (my last paragraph from the previous email),
> > perhaps I
> > > &

Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Sagar
Congrats Bruno!

Sagar.

On Wed, Nov 2, 2022 at 7:51 AM deng ziming  wrote:

> Congrats!
>
> --
> Ziming
>
> > On Nov 2, 2022, at 3:36 AM, Guozhang Wang  wrote:
> >
> > Hi everyone,
> >
> > I'd like to introduce our new Kafka PMC member, Bruno.
> >
> > Bruno has been a committer since April. 2021 and has been very active in
> > the community. He's a key contributor to Kafka Streams, and also helped
> > review a lot of horizontal improvements such as Mockito. It is my
> pleasure
> > to announce that Bruno has agreed to join the Kafka PMC.
> >
> > Congratulations, Bruno!
> >
> > -- Guozhang Wang, on behalf of Apache Kafka PMC
>
>


Re: [DISCUSS] KIP-882: Make Kafka Connect REST API request timeouts configurable

2022-11-03 Thread Sagar
Hey Yash,

Thanks for the KIP! This looks like a useful feature.

I think the discussion thread already has some great points by Chris. Just
a couple of points/clarifications=>

Regarding, pt#2 , I guess it might be better to forward to the leader as
suggested by Yash. Having said that, why does the worker forward to the
leader? I am thinking if the worker can perform the validation on it's own,
we could let it do the validation instead of forwarding everything to the
leader(even though it might be cheap to forward all requests to the leader).

Pt#3 => I think a bound is certainly needed but IMO it shouldn't go beyond
10 mins considering this is just validation. We shouldn't end up in a
situation where a few faulty connectors end up blocking a lot of request
processing threads, so while increasing the config is certainly helpful, we
shouldn't set too high a value IMO. Of course I am also open to suggestions
here.

Thanks!
Sagar.

On Thu, Nov 3, 2022 at 9:01 PM Chris Egerton 
wrote:

> Hi Yash,
>
> RE 2: That's a great point about validations already being performed by the
> leader. For completeness's sake, I'd like to note that this only holds for
> valid configurations; invalid ones are caught right now before being
> forwarded to the leader. Still, I think it's fine to forward to the leader
> for now and optimize further in the future if necessary. If frequent
> validations are taking place they should be conducted via the `PUT
> /connector-plugins/{pluginName}/config/validate` endpoint, which won't do
> any forwarding at all.
>
> RE 3: Yes, those endpoints LGTM. And yes, bounds on the timeout also seem
> reasonable... maybe a low-importance worker property could work for that?
> Not sure what would make sense for a default; probably somewhere in the
> 10-60 minute range but would be interested in others' thoughts.
>
> Thanks for the clarification on the zombie fencing logic. I think we might
> want to have some more subtle logic around the interaction between calls to
> Admin::fenceProducers and a worker-level timeout property if we go that
> route, but we can cross that particular bridge if we get back to it.
>
> Cheers,
>
> Chris
>
> On Wed, Nov 2, 2022 at 1:48 PM Yash Mayya  wrote:
>
> > Hi Chris,
> >
> > Thanks a lot for the super quick response and the great feedback!
> >
> > 1. I think that makes a lot of sense, and I'd be happy to update the KIP
> to
> > include this change in the scope. The current behavior where the API
> > response indicates a time out but the connector is created/updated
> > eventually anyway can be pretty confusing and is generally not a good
> user
> > experience IMO.
> >
> > 2. Wow, thanks for pointing this out - it's a really good catch and
> > something I hadn't noticed was happening with the current implementation.
> > While I do like the idea of having a query parameter that determines
> > whether validations can be skipped, I'm wondering if it might not be
> easier
> > and cleaner to just do the leader check earlier and avoid doing the
> > unnecessary config validation on the first worker? Since each config
> > validation happens on its own thread, I'm not so sure about the concern
> of
> > overloading the leader even on larger clusters, especially since
> > validations aren't typically long running operations. Furthermore, even
> > with the current implementation, the leader will always be doing a config
> > validation for connector create / update REST API requests on any worker.
> >
> > 3. That's a good point, and this way we can also restrict the APIs whose
> > timeouts are configurable - I'm thinking `PUT
> > /connector-plugins/{pluginName}/config/validate`, `POST /connectors` and
> > `PUT /connectors/{connector}/config` are the ones where such a timeout
> > parameter could be useful. Also, do you think we should enforce some
> > reasonable bounds for the timeout config?
> >
> > On the zombie fencing point, the implication was that the new worker
> > property would not control the timeout used for the call to
> > Admin::fenceProducers. However, if we go with a timeout query parameter
> > approach, even the timeout for the `PUT /connectors/{connector}/fence'
> > endpoint will remain unaffected.
> >
> > Thanks,
> > Yash
> >
> > On Wed, Nov 2, 2022 at 8:13 PM Chris Egerton 
> > wrote:
> >
> > > Hi Yash,
> > >
> > > Thanks for the KIP. It's a nice, focused change. Initially I was
> hesitant
> > > to support cases where connector validation takes this long, but
> > > considering the

Re: [DISCUSS] KIP-882: Make Kafka Connect REST API request timeouts configurable

2022-11-05 Thread Sagar
Hey Yash,

Thanks for the explanation. I think it should be fine to delegate the
validation directly to the leader.

Thanks!
Sagar.

On Sat, Nov 5, 2022 at 10:42 AM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for chiming in!
>
> > Having said that, why does the worker forward to the
> > leader? I am thinking if the worker can perform the validation on it's
> own,
> > we could let it do the validation instead of forwarding everything to the
> > leader
>
> Only the leader is allowed to perform writes to the config topic, so any
> request that requires a config topic write ends up being forwarded to the
> leader. The `POST /connectors` and `PUT /connectors/{connector}/config`
> endpoints call `Herder::putConnectorConfig` which internally does a config
> validation first before initiating another herder request to write to the
> config topic (in distributed mode). If we want to allow the first worker to
> do the config validation, and then forward the request to the leader just
> for the write to the config topic, we'd either need something like a skip
> validations query parameter on the endpoint like Chris talks about above or
> else a new internal only endpoint that just does a write to the config
> topic without any prior config validation. However, we agreed that this
> optimization doesn't really seem necessary for now and can be done later if
> deemed useful. I'd be happy to hear differing thoughts if any, however.
>
> > I think a bound is certainly needed but IMO it shouldn't go beyond
> > 10 mins considering this is just validation
>
> Yeah, I agree that this seems like a fair value - I've elected to go with a
> default value of 10 minutes for the proposed worker configuration that sets
> an upper bound for the timeout query parameter.
>
> Thanks,
> Yash
>
> On Sat, Nov 5, 2022 at 10:30 AM Yash Mayya  wrote:
>
> > Hi Chris,
> >
> > Thanks again for your feedback. I think a worker configuration for the
> > upper bound makes sense - I initially thought we could hardcode it (just
> > like the current request timeout is), but there's no reason to set
> another
> > artificial bound that isn't user configurable which is exactly what we're
> > trying to change here in the first place. I've updated the KIP based on
> all
> > our discussion above.
> >
> > Thanks,
> > Yash
> >
> > On Thu, Nov 3, 2022 at 11:01 PM Sagar  wrote:
> >
> >> Hey Yash,
> >>
> >> Thanks for the KIP! This looks like a useful feature.
> >>
> >> I think the discussion thread already has some great points by Chris.
> Just
> >> a couple of points/clarifications=>
> >>
> >> Regarding, pt#2 , I guess it might be better to forward to the leader as
> >> suggested by Yash. Having said that, why does the worker forward to the
> >> leader? I am thinking if the worker can perform the validation on it's
> >> own,
> >> we could let it do the validation instead of forwarding everything to
> the
> >> leader(even though it might be cheap to forward all requests to the
> >> leader).
> >>
> >> Pt#3 => I think a bound is certainly needed but IMO it shouldn't go
> beyond
> >> 10 mins considering this is just validation. We shouldn't end up in a
> >> situation where a few faulty connectors end up blocking a lot of request
> >> processing threads, so while increasing the config is certainly helpful,
> >> we
> >> shouldn't set too high a value IMO. Of course I am also open to
> >> suggestions
> >> here.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Thu, Nov 3, 2022 at 9:01 PM Chris Egerton 
> >> wrote:
> >>
> >> > Hi Yash,
> >> >
> >> > RE 2: That's a great point about validations already being performed
> by
> >> the
> >> > leader. For completeness's sake, I'd like to note that this only holds
> >> for
> >> > valid configurations; invalid ones are caught right now before being
> >> > forwarded to the leader. Still, I think it's fine to forward to the
> >> leader
> >> > for now and optimize further in the future if necessary. If frequent
> >> > validations are taking place they should be conducted via the `PUT
> >> > /connector-plugins/{pluginName}/config/validate` endpoint, which won't
> >> do
> >> > any forwarding at all.
> >> >
> >> > RE 3: Yes, those endpoints LGTM. And yes, bounds on the timeo

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-15 Thread Sagar
Hey Hector,

Thanks for the KIP. I have a minor suggestion in terms of naming. Since
this is a callback method, would it make sense to call it onDelete()?

Also, the failure scenarios discussed by Greg would need handling. Among
other things, I like the idea of having a timeout for graceful shutdown or
else try a force shutdown. What do you think about that approach?

Thanks!
Sagar.

On Sat, Nov 12, 2022 at 1:53 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Thanks Greg for taking your time to review not just the KIP but also the
> PR.
>
> 1. You made very valid points regarding the behavior of the destroy()
> callback for connectors that don't follow the happy path. After thinking
> about it, I decided to tweak the implementation a bit and have the
> destroy() method be called during the worker shutdown: this means it will
> share the same guarantees the connector#stop() method has. An alternative
> implementation can be to have an overloaded connector#stop(boolean deleted)
> method that signals a connector that it is being stopped due to deletion,
> but I think that having a separate destroy() method provides clearer
> semantics.
>
> I'll make sure to ammend the KIP with these details.
>
> 3. Without going too deep on the types of operations that can be performed
> by a connector when it's being deleted, I can imagine the
> org.apache.kafka.connect.source.SourceConnector base class having a default
> implementation that deletes the connector's offsets automatically
> (controlled by a property); this is in the context of KIP-875 (first-class
> offsets support in Kafka Connect). Similar behaviors can be introduced for
> the SinkConnector, however I'm not sure if this KIP is the right place to
> discuss all the possibilities, or if we shoold keeping it more
> narrow-focused on  providing a callback mechanism for when connectors are
> deleted, and what the expectations are around this newly introduced method.
> What do you think?
>
>
> From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Hector,
>
> Thanks for the KIP!
>
> This is certainly missing functionality from the native Connect framework,
> and we should try to make it possible to inform connectors about this part
> of their lifecycle.
> However, as with most functionality that was left out of the initial
> implementation of the framework, the details are more challenging to work
> out.
>
> 1. What happens when the destroy call throws an error, how does the
> framework respond?
>
> This is unspecified in the KIP, and it appears that your proposed changes
> could cause the herder to fail.
> From the perspective of operators & connector developers, what is a
> reasonable expectation to have for failure of a destroy?
> I could see operators wanting both a graceful-delete to make use of this
> new feature, and a force-delete for when the graceful-delete fails.
> A connector developer could choose to swallow all errors encountered, or
> fail-fast to indicate to the operator that there is an issue with the
> graceful-delete flow.
> If the alternative is crashing the herder, connector developers may choose
> to hide serious errors, which is undesirable.
>
> 2. What happens when the destroy() call takes a long time to complete, or
> is interrupted?
>
> It appears that your implementation serially destroy()s each appropriate
> connector, and may prevent the herder thread from making progress while the
> operation is ongoing.
> We have previously had to patch Connect to perform all connector and task
> operations on a background thread, because some connector method
> implementations can stall indefinitely.
> Connect also has the notion of "cancelling" a connector/task if a graceful
> shutdown timeout operation takes too long. Perhaps some of that design or
> machinery may be useful to protect this method call as well.
>
> More specific to the destroy() call itself, what happens when a connector
> completes part of a destroy operation and then cannot complete the
> remainder, either due to timing out or a worker crashing?
> What is the contract with the connector developer about this method? Is the
> destroy() only started exactly once during the lifetime of the connector,
> or may it be retried?
>
> 3. What should be considered a reasonable custom implementation of the
> destroy() call? What resources should it clean up by default?
>
> I think we can broadly categorize the state a connector mutates among the
> following
> * Framework-managed state (e.g. source offsets, consumer offsets)
> * Implementation detail 

Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Sagar
Hey Snehashsih,

Thanks for the KIP. It looks like a very useful feature. Couple of
small-ish points, let me know what you think:

1) Should we update the GET /connectors endpoint to include the version of
the plugin that is running? It could be useful to figure out the version of
the plugin or I am assuming it gets returned by the expand=info call?
2) I am not aware of this and hence asking, can 2 connectors with different
versions have the same name? Does the plugin isolation allow this? This
could have a bearing when using the lifecycle endpoints for connectors like
DELETE etc.

Thanks!
Sagar.


On Tue, Nov 22, 2022 at 2:10 PM Ashwin  wrote:

> Hi Snehasis,
>
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> a versioning scheme for a connector config for the same connector (and not
> different versions of a connector plugin).
>
> Sorry for not being more precise in my wording -  I meant registering
> versions of schema for connector config.
>
> Let's take the example of a fictional connector which uses a fictional AWS
> service.
>
> Fictional Connector Config schema version:2.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#";,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "aws_access_key": {
>   "type": "string"
> },
> "aws_secret_key": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "aws_access_key",
> "aws_secret_key"
>   ]
> }
>
> Fictional Connector config schema version:3.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#";,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "iam_role": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "iam_role"
>   ]
> }
>
> The connector which supports Fictional config schema 2.0  will validate the
> access key and secret key.
> Whereas a connector which supports config with schema version 3.0 will only
> validate the IAM role.
>
> This is the alternative which I wanted to suggest. Each plugin will
> register the schema versions of connector config which it supports.
>
> The plugin paths may be optionally different i.e  we don't have to
> mandatorily add a new plugin path to support a new schema version.
>
> Thanks,
> Ashwin
>
> On Tue, Nov 22, 2022 at 12:47 PM Snehashis 
> wrote:
>
> > Thanks for the input Ashwin.
> >
> > > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > > config is versioned and has a schema. Then a single plugin (whose
> > > dependencies have not changed) can handle multiple config versions for
> > the
> > > same connector class.
> >
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> > a versioning scheme for a connector config for the same connector (and
> not
> > different versions of a connector plugin). That is a somewhat tangential
> > problem. While it is definitely a useful feature to have, like a log to
> > check what changes were made over time to the config which might make it
> > easier to do rollbacks, it is not the focus here. Here by version we mean
> > to say what underlying version of the plugin should the given
> configuration
> > of the connector use. Perhaps it is better to change the name of the
> > parameter from connector.version to connector.plugin.version or
> > plugin.version if it was confusing. wdyt?
> >
> > >  2. Any plans to support assisted migration e.g if a user invokes "POST
> > > connector/config?migrate=latest", the latest version __attempts__ to
> > > transform the existing config to the newer version. This would require
> > > adding a method like "boolean migrate(Version fromVersion)" to the
> > > connector interface.
> >
> > This is an enhancement we can think of doing in future. Users can simply
> do
> > a PUT call with the updated config which has the updated version number

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-22 Thread Sagar
Hi Victoria,

Thanks for the KIP. Seems like a very interesting idea!

I have a couple of questions:

1) Did you consider adding a method similar to :
List> get(K key, long from, long to)?

I think this could be useful considering that this
versioning scheme unlocks time travel at a key basis. WDYT?

2) I have a similar question as Matthias, about the timestampTo argument
when doing a get. Is it inclusive or exclusive?

3) validFrom sounds slightly confusing to me. It is essentially the
timestamp at which the record was inserted. validFrom makes it sound like
validTo which can keep changing based on new records while *from* is fixed.
WDYT?

4) Even I think delete api should be supported.

Thanks!
Sagar.

On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax  wrote:

> Thanks for the KIP Victoria. Very well written!
>
>
> Couple of questions (many might just require to add some more details to
> the KIP):
>
>   (1) Why does the new store not extend KeyValueStore, but StateStore?
> In the end, it's a KeyValueStore?
>
>   (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
> want to support IQ in this KIP, it might be good to add this interface
> right away to avoid complications for follow up KIPs? Or won't there by
> any complications anyway?
>
>   (3) Why do we not have a `delete(key)` method? I am ok with not
> supporting all methods from existing KV-store, but a `delete(key)` seems
> to be fundamentally to have?
>
>   (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
> my own clarification (should we add something to the JavaDocs?).
>
>   (4b) Should we throw an exception if a user queries out-of-bound
> instead of returning `null` (in `get(key,ts)`)?
>-> You put it into "rejected alternatives", and I understand your
> argument. Would love to get input from others about this question
> though. -- It seems we also return `null` for windowed stores, so maybe
> the strongest argument is to align to existing behavior? Or do we have
> case for which the current behavior is problematic?
>
>   (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
> discretion when this is the case)" -> Should we make it a stricter
> contract such that the user can reason about it better (there is WIP to
> make retention time a strict bound for windowed stores atm)
>-> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
> strict bound, too.
>
>   (5a) Do we need to expose `segmentInterval`? For windowed-stores, we
> also use segments but hard-code it to two (it was exposed in earlier
> versions but it seems not useful, even if we would be open to expose it
> again if there is user demand).
>
>   (5b) JavaDocs says: "Performance degrades as more record versions for
> the same key are collected in a single segment. On the other hand,
> out-of-order writes and reads which access older segments may slow down
> if there are too many segments." -- Wondering if JavaDocs should make
> any statements about expected performance? Seems to be an implementation
> detail?
>
>   (6) validTo timestamp is "exclusive", right? Ie, if I query
> `get(key,ts[=validToV1])` I would get `null` or the "next" record v2
> with validFromV2=ts?
>
>   (7) The KIP says, that segments are stores in the same RocksDB -- for
> this case, how are efficient deletes handled? For windowed-store, we can
> just delete a full RocksDB.
>
>   (8) Rejected alternatives: you propose to not return the validTo
> timestamp -- if we find it useful in the future to return it, would
> there be a clean path to change it accordingly?
>
>
> -Matthias
>
>
> On 11/16/22 9:57 PM, Victoria Xia wrote:
> > Hi everyone,
> >
> > I have a proposal for introducing versioned state stores in Kafka
> Streams.
> > Versioned state stores are similar to key-value stores except they can
> > store multiple record versions for a single key. This KIP focuses on
> > interfaces only in order to limit the scope of the KIP.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >
> > Thanks,
> > Victoria
> >
>


Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-23 Thread Sagar
Hi Vicky,

Thanks for your response!

I would just use numbers to refer to your comments.

1) Thanks for your response. Even I am not totally sure whether these
should be supported via IQv2 or via store interface. That said, I wouldn't
definitely qualify this as  blocking the KIP for sure so we can live
without it :)

2) Yeah if the 2 APIs for get have different semantics for timestampTo,
then it could be confusing. I went through the link for temporal tables
(TFS!) and I now get why the AS OF semantics would have it inclusive. I
think part of the problem is that the name get on it's own is not as
expressive as SQL. Can we name according to the semantics that you want to
support like `getAsOf` or something like that? I am not sure if we do that
in our codebase though. Maybe the experts can chime in.

3) hmm I would have named it `validUpto` But again not very picky about it.
After going through the link and your KIP, it's a lot clearer to me.

4) I think delete(key) should be sufficient. With delete, we would
stlll keep the older versions of the key right?

Thanks!
Sagar.

On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia
 wrote:

> Thanks, Matthias and Sagar, for your comments! I've responded here for now,
> and will update the KIP afterwards with the outcome of our discussions as
> they resolve.
>
> --- Matthias's comments ---
>
> > (1) Why does the new store not extend KeyValueStore, but StateStore?
> In the end, it's a KeyValueStore?
>
> A `VersionedKeyValueStore` is not a `KeyValueStore` because
> many of the KeyValueStore methods would not make sense for a versioned
> store. For example, `put(K key, V value)` is not meaningful for a versioned
> store because the record needs a timestamp associated with it.
>
> A `VersionedKeyValueStore` is more similar to a `KeyValueStore ValueAndTimestamp>` (i.e., `TimestampedKeyValueStore`), but some
> of the TimestampedKeyValueStore methods are still problematic. For example,
> what does it mean for `delete(K key)` to have return type
> `ValueAndTimestamp`? Does this mean that `delete(K key)` only deletes
> (and returns) the latest record version for the key? Probably we want a
> versioned store to have `delete(K key)` delete all record versions for the
> given key, in which case the return type is better suited as an
> iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
> ValueAndTimestamp value)` also has ambiguous semantics for versioned stores
> (i.e., what does it mean for the key/record to be "absent").
>
> I agree that conceptually a versioned key-value store is just a key-value
> store, though. In the future if we redesign the store interfaces, it'd be
> great to unify them by having a more generic KeyValueStore interface that
> allows for extra flexibility to support different types of key-value
> stores, including versioned stores. (Or, if you can think of a way to
> achieve this with the existing interfaces today, I'm all ears!)
>
> > (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
> want to support IQ in this KIP, it might be good to add this interface
> right away to avoid complications for follow up KIPs? Or won't there by
> any complications anyway?
>
> I don't think there will be complications for refactoring to add this
> interface in the future. Refactoring out ReadOnlyVersionedKeyValueStore
> from VersionedKeyValueStore would leave VersionedKeyValueStore unchanged
> from the outside.
>
> Also, is it true that the ReadOnlyKeyValueStore interface is only used for
> IQv1 and not IQv2? I think it's an open question as to whether we should
> support IQv1 for versioned stores or only IQv2. If the latter, then maybe
> we won't need the extra interface at all.
>
> > (3) Why do we not have a `delete(key)` method? I am ok with not
> supporting all methods from existing KV-store, but a `delete(key)` seems
> to be fundamentally to have?
>
> What do you think the semantics of `delete(key)` should be for versioned
> stores? Should `delete(key)` delete (and return) all record versions for
> the key? Or should we have `delete(key, timestamp)` which is equivalent to
> `put(key, null, timestamp)` except with a return type to return
> ValueAndTimestamp representing the record it replaced?
>
> If we have ready alignment on what the interface and semantics for
> `delete(key)` should be, then adding it in this KIP sounds good. I just
> didn't want the rest of the KIP to be hung up over additional interfaces,
> given that we can always add extra interfaces in the future.
>
> > (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
> my own clarification (s

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-12-01 Thread Sagar
Thanks Victoria,

I guess an advantage of exposing a method like delete(key, timestamp) could
be that from a user's standpoint, it is a single operation and not 2. The
equivalent of this method i.e put followed by get is not atomic so exposing
it certainly sounds like a good idea.

Thanks!
Sagar.

On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia
 wrote:

> Thanks, Sagar and Bruno, for your insights and comments!
>
> > Sagar: Can we name according to the semantics that you want to
> support like `getAsOf` or something like that? I am not sure if we do that
> in our codebase though. Maybe the experts can chime in.
>
> Because it is a new method that will be added, we should be able to name it
> whatever we like. I agree `getAsOf` is more clear, albeit wordier.
> Introducing `getAsOf(key, timestamp)` means we could leave open `get(key,
> timeFrom, timeTo)` to have an exclusive `timeTo` without introducing a
> collision. (We could introduce `getBetween(key, timeFrom, timeTo)` instead
> to delineate even more clearly, though this is better left for a future
> KIP.)
>
> I don't think there's any existing precedent in codebase to follow here but
> I'll leave that to the experts. Curious to hear what others prefer as well.
>
> > Sagar: With delete, we would stlll keep the older versions of the key
> right?
>
> We could certainly choose this for the semantics of delete(...) -- and it
> sounds like we should too, based on Bruno's confirmation below that this
> feels more natural to him as well -- but as Bruno noted in his message
> below I think we'll want the method signature to be `delete(key,
> timestamp)` then, so that there is an explicit timestamp to associate with
> the deletion. In other words, `delete(key, timestamp)` has the same effect
> as `put(key, null, timestamp)`. The only difference is that the `put(...)`
> method has a `void` return type, while `delete(key, timestamp)` can have
> `ValueAndTimestamp` as return type in order to return the record which is
> replaced (if any). In other words, `delete(key, timestamp)` is equivalent
> to `put(key, null, timestamp)` followed by `get(key, timestamp)`.
>
> > Bruno: I would also not change the semantics so that it deletes all
> versions of
> a key. I would rather add a new method purge(key) or
> deleteAllVersions(key) or similar if we want to have such a method in
> this first KIP.
>
> Makes sense; I'm convinced. Let's defer
> `purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's agreement
> that `delete(key, timestamp)` (as described above) is valuable, we can keep
> it in this first KIP even though it is syntactic sugar. If this turns into
> a larger discussion, we can defer this to a future KIP as well.
>
> > Bruno: I would treat the history retention as a strict limit. [...] You
> could also add historyRetentionMs() to the VersionedKeyValueStore
> interface to make the concept of the history retention part of the
> interface.
>
> OK. That's the second vote for rewording the javadoc for
> `VersionedKeyValueStore#get(key, timestampTo)` to remove the parenthetical
> and clarify that history retention should be used to dictate this case, so
> I'll go ahead and do that. I'll leave out adding `historyRetentionMs()` to
> the interface for now, though, for the sake of consistency with other
> stores (e.g., window stores) which don't expose similar types of
> configurations from their interfaces.
>
> > Bruno: exclusive vs inclusive regarding validTo timestamp in get().
> Doesn't this decision depend on the semantics of the join for which this
> state store should be used?
>
> Yes, you are correct. As a user I would expect that a stream-side record
> with the same timestamp as a table-side record _would_ produce a join
> result, which is consistent with the proposal for timestampTo to be
> inclusive. (FWIW I tried this out with a Flink temporal join just now and
> observed this result as well. Not sure where to look for other standards to
> validate this expectation.)
>
> > Bruno: If Streams does not update min.compaction.lag.ms during
> rebalances,
> users have to do it each time they change history retention in the code,
> right? That seems odd to me. What is the actual reason for not updating
> the config? How does Streams handle updates to windowed stores?
>
> Yes, users will have to update min.compaction.lag.ms for the changelog
> topic themselves if they update history retention in their code. This is
> consistent with what happens for window stores today: e.g., if a user
> updates grace period for a windowed aggregation, then they are responsible
> for updating retention.ms on their windowed changelog topic as well.
>
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-06 Thread Sagar
Hi All,

I made a couple of edits to the KIP which came up during the code review.
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.

Updated KIP:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 6:43 PM Sagar  wrote:

> Thanks Bruno,
>
> Marking this as accepted.
>
> Thanks everyone for their comments/feedback.
>
> Thanks!
> Sagar.
>
> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the update and the PR!
>>
>> +1 (binding)
>>
>> Best,
>> Bruno
>>
>> On 10.09.22 18:57, Sagar wrote:
>> > Hi Bruno,
>> >
>> > Thanks, I think these changes make sense to me. I have updated the KIP
>> > accordingly.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna 
>> wrote:
>> >
>> >> Hi Sagar,
>> >>
>> >> I would not drop the support for dropping records. I would also not
>> >> return null from partitions(). Maybe an Optional can help here. An
>> empty
>> >> Optional would mean to use the default partitioning behavior of the
>> >> producer. So we would have:
>> >>
>> >> - non-empty Optional, non-empty list of integers: partitions to send
>> the
>> >> record to
>> >> - non-empty Optional, empty list of integers: drop the record
>> >> - empty Optional: use default behavior
>> >>
>> >> What do other think?
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >> On 02.09.22 13:53, Sagar wrote:
>> >>> Hello Bruno/Chris,
>> >>>
>> >>> Since these are the last set of changes(I am assuming haha), it would
>> be
>> >>> great if you could review the 2 options from above so that we can
>> close
>> >> the
>> >>> voting. Of course I am happy to incorporate any other requisite
>> changes.
>> >>>
>> >>> Thanks!
>> >>> Sagar.
>> >>>
>> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar 
>> >> wrote:
>> >>>
>> >>>> Thanks Bruno for the great points.
>> >>>>
>> >>>> I see 2 options here =>
>> >>>>
>> >>>> 1) As Chris suggested, drop the support for dropping records in the
>> >>>> partitioner. That way, an empty list could signify the usage of a
>> >> default
>> >>>> partitioner. Also, if the deprecated partition() method returns null
>> >>>> thereby signifying the default partitioner, the partitions() can
>> return
>> >> an
>> >>>> empty list i.e default partitioner.
>> >>>>
>> >>>> 2) OR we treat a null return type of partitions() method to signify
>> the
>> >>>> usage of the default partitioner. In the default implementation of
>> >>>> partitions() method, if partition() returns null, then even
>> partitions()
>> >>>> can return null(instead of an empty list). The RecordCollectorImpl
>> code
>> >> can
>> >>>> also be modified accordingly. @Chris, to your point, we can even drop
>> >> the
>> >>>> support of dropping of records. It came up during KIP discussion,
>> and I
>> >>>> thought it might be a useful feature. Let me know what you think.
>> >>>>
>> >>>> 3) Lastly about the partition number check. I wanted to avoid the
>> >> throwing
>> >>>> of exception so I thought adding it might be a useful feature. But as
>> >> you
>> >>>> pointed out, if it can break backwards compatibility, it's better to
>> >> remove
>> >>>> it.
>> >>>>
>> >>>> Thanks!
>> >>>> Sagar.
>> >>>>
>> >>>>
>> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>> 
>> >>>> wrote:
>> >>>>
>> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
>> >> need
>> >>>>> support for dropping records in the partitioner? It doesn't seem
>> >> necessary
&g

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-07 Thread Sagar
Hi Mathias,

I did save it. The changes are added under Public Interfaces (Pt#2 about
enhancing KeyQueryMetadata with partitions method) and
throwing IllegalArgumentException when StreamPartitioner#partitions method
returns multiple partitions for just FK-join instead of the earlier decided
FK-Join and IQ.

The background is that for IQ, if the users have multi casted records to
multiple partitions during ingestion but the fetch returns only a single
partition, then it would be wrong. That's why the restriction was lifted
for IQ and that's the reason KeyQueryMetadata now has another partitions()
method to signify the same.

FK-Join also has a similar case, but while reviewing it was felt that
FK-Join on it's own is fairly complicated and we don't need this feature
right away so the restriction still exists.

Thanks!
Sagar.


On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax  wrote:

> I don't see any update on the wiki about it. Did you forget to hit "save"?
>
> Can you also provide some background? I am not sure right now if I
> understand the proposed changes?
>
>
> -Matthias
>
> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> > Thanks Sagar, this makes sense to me -- we clearly need additional
> changes
> > to
> > avoid breaking IQ when using this feature, but I agree with continuing to
> > restrict
> > FKJ since they wouldn't stop working without it, and would become much
> > harder
> > to reason about (than they already are) if we did enable them to use it.
> >
> > And of course, they can still multicast the final results of a FKJ, they
> > just can't
> > mess with the internal workings of it in this way.
> >
> > On Tue, Dec 6, 2022 at 9:48 AM Sagar  wrote:
> >
> >> Hi All,
> >>
> >> I made a couple of edits to the KIP which came up during the code
> review.
> >> Changes at a high level are:
> >>
> >> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >> 2) Lifting the restriction of a single partition for IQ. Now the
> >> restriction holds only for FK Join.
> >>
> >> Updated KIP:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Mon, Sep 12, 2022 at 6:43 PM Sagar 
> wrote:
> >>
> >>> Thanks Bruno,
> >>>
> >>> Marking this as accepted.
> >>>
> >>> Thanks everyone for their comments/feedback.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna 
> >> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thanks for the update and the PR!
> >>>>
> >>>> +1 (binding)
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 10.09.22 18:57, Sagar wrote:
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Thanks, I think these changes make sense to me. I have updated the
> KIP
> >>>>> accordingly.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna 
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> I would not drop the support for dropping records. I would also not
> >>>>>> return null from partitions(). Maybe an Optional can help here. An
> >>>> empty
> >>>>>> Optional would mean to use the default partitioning behavior of the
> >>>>>> producer. So we would have:
> >>>>>>
> >>>>>> - non-empty Optional, non-empty list of integers: partitions to send
> >>>> the
> >>>>>> record to
> >>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>> - empty Optional: use default behavior
> >>>>>>
> >>>>>> What do other think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>> Hello Bruno/Chris,
> >>>>>>>
> >>>>>>> Since these are the last set of changes(I am assuming haha), it
> >> would
> >>>> be
> >>>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-08 Thread Sagar
Thanks Matthias,

Well, as things stand, we did have internal discussions on this and it
seemed ok to open it up for IQ and more importantly not ok to have it
opened up for FK-Join. And more importantly, the PR for this is already
merged and some of these things came up during that. Here's the PR link:
https://github.com/apache/kafka/pull/12803.

Thanks!
Sagar.


On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax  wrote:

> Ah. Missed it as it does not have a nice "code block" similar to
> `StreamPartitioner` changes.
>
> I understand the motivation, but I am wondering if we might head into a
> tricky direction? State stores (at least the built-in ones) and IQ are
> kinda build with the idea to have sharded data and that a multi-cast of
> keys is an anti-pattern?
>
> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> sure that generalizing the concepts does not cause issues in the future?
>
> Ie, should we claim that the multi-cast feature should be used for
> KStreams only, but not for KTables?
>
> Just want to double check that we are not doing something we regret later.
>
>
> -Matthias
>
>
> On 12/7/22 6:45 PM, Sagar wrote:
> > Hi Mathias,
> >
> > I did save it. The changes are added under Public Interfaces (Pt#2 about
> > enhancing KeyQueryMetadata with partitions method) and
> > throwing IllegalArgumentException when StreamPartitioner#partitions
> method
> > returns multiple partitions for just FK-join instead of the earlier
> decided
> > FK-Join and IQ.
> >
> > The background is that for IQ, if the users have multi casted records to
> > multiple partitions during ingestion but the fetch returns only a single
> > partition, then it would be wrong. That's why the restriction was lifted
> > for IQ and that's the reason KeyQueryMetadata now has another
> partitions()
> > method to signify the same.
> >
> > FK-Join also has a similar case, but while reviewing it was felt that
> > FK-Join on it's own is fairly complicated and we don't need this feature
> > right away so the restriction still exists.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax  wrote:
> >
> >> I don't see any update on the wiki about it. Did you forget to hit
> "save"?
> >>
> >> Can you also provide some background? I am not sure right now if I
> >> understand the proposed changes?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >> changes
> >>> to
> >>> avoid breaking IQ when using this feature, but I agree with continuing
> to
> >>> restrict
> >>> FKJ since they wouldn't stop working without it, and would become much
> >>> harder
> >>> to reason about (than they already are) if we did enable them to use
> it.
> >>>
> >>> And of course, they can still multicast the final results of a FKJ,
> they
> >>> just can't
> >>> mess with the internal workings of it in this way.
> >>>
> >>> On Tue, Dec 6, 2022 at 9:48 AM Sagar 
> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> I made a couple of edits to the KIP which came up during the code
> >> review.
> >>>> Changes at a high level are:
> >>>>
> >>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >>>> 2) Lifting the restriction of a single partition for IQ. Now the
> >>>> restriction holds only for FK Join.
> >>>>
> >>>> Updated KIP:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar 
> >> wrote:
> >>>>
> >>>>> Thanks Bruno,
> >>>>>
> >>>>> Marking this as accepted.
> >>>>>
> >>>>> Thanks everyone for their comments/feedback.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna 
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> Th

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-09 Thread Sagar
Hey Matthias,

Actually I had shared the PR link for any potential issues that might have
gone missing. I guess it didn't come out that way in my response. Apologies
for that!

I am more than happy to incorporate any feedback/changes or address any
concerns that are still present around this at this point as well.

And I would keep in mind the feedback to provide more time in such a
scenario.

Thanks!
Sagar.

On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax  wrote:

> It is what it is.
>
> > we did have internal discussions on this
>
> We sometimes have the case that a KIP need adjustment as stuff is
> discovered during coding. And having a discussion on the PR about it is
> fine. -- However, before the PR gets merge, the KIP change should be
> announced to verify that nobody has objections to he change, before we
> carry forward.
>
> It's up to the committer who reviews/merges the PR to ensure that this
> process is followed IMHO. I hope we can do better next time.
>
> (And yes, there was the 3.4 release KIP deadline that might explain it,
> but it seems important that we give enough time is make "tricky" changes
> and not rush into stuff IMHO.)
>
>
> -Matthias
>
>
> On 12/8/22 7:04 PM, Sagar wrote:
> > Thanks Matthias,
> >
> > Well, as things stand, we did have internal discussions on this and it
> > seemed ok to open it up for IQ and more importantly not ok to have it
> > opened up for FK-Join. And more importantly, the PR for this is already
> > merged and some of these things came up during that. Here's the PR link:
> > https://github.com/apache/kafka/pull/12803.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax  wrote:
> >
> >> Ah. Missed it as it does not have a nice "code block" similar to
> >> `StreamPartitioner` changes.
> >>
> >> I understand the motivation, but I am wondering if we might head into a
> >> tricky direction? State stores (at least the built-in ones) and IQ are
> >> kinda build with the idea to have sharded data and that a multi-cast of
> >> keys is an anti-pattern?
> >>
> >> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> >> sure that generalizing the concepts does not cause issues in the future?
> >>
> >> Ie, should we claim that the multi-cast feature should be used for
> >> KStreams only, but not for KTables?
> >>
> >> Just want to double check that we are not doing something we regret
> later.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/7/22 6:45 PM, Sagar wrote:
> >>> Hi Mathias,
> >>>
> >>> I did save it. The changes are added under Public Interfaces (Pt#2
> about
> >>> enhancing KeyQueryMetadata with partitions method) and
> >>> throwing IllegalArgumentException when StreamPartitioner#partitions
> >> method
> >>> returns multiple partitions for just FK-join instead of the earlier
> >> decided
> >>> FK-Join and IQ.
> >>>
> >>> The background is that for IQ, if the users have multi casted records
> to
> >>> multiple partitions during ingestion but the fetch returns only a
> single
> >>> partition, then it would be wrong. That's why the restriction was
> lifted
> >>> for IQ and that's the reason KeyQueryMetadata now has another
> >> partitions()
> >>> method to signify the same.
> >>>
> >>> FK-Join also has a similar case, but while reviewing it was felt that
> >>> FK-Join on it's own is fairly complicated and we don't need this
> feature
> >>> right away so the restriction still exists.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>>
> >>> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax 
> wrote:
> >>>
> >>>> I don't see any update on the wiki about it. Did you forget to hit
> >> "save"?
> >>>>
> >>>> Can you also provide some background? I am not sure right now if I
> >>>> understand the proposed changes?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>>>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >>>> changes
> >>>>> to
> >>>>> avoid breaking IQ when using this feat

Re: [ANNOUNCE] New Kafka PMC Member: Luke Chen

2022-12-17 Thread Sagar
Congratulations Luke! Very well deserved!

Sagar.

On Sun, 18 Dec 2022 at 6:41 AM, Sam Barker  wrote:

> Congratulations Luke!
>
> On Sat, 17 Dec 2022 at 08:41, Jun Rao  wrote:
>
> > Hi, Everyone,
> >
> > Luke Chen has been a Kafka committer since Feb. 9, 2022. He has been very
> > instrumental to the community since becoming a committer. It's my
> pleasure
> > to announce that Luke  is now a member of Kafka PMC.
> >
> > Congratulations Luke!
> >
> > Jun
> > on behalf of Apache Kafka PMC
> >
>


Re: [VOTE] KIP-889 Versioned State Stores

2022-12-20 Thread Sagar
Hi Victoria,

+1 (non-binding).

Thanks!
Sagar.

On Tue, Dec 20, 2022 at 1:39 PM Bruno Cadonna  wrote:

> Hi Victoria,
>
> Thanks for the KIP!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 19.12.22 20:03, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 12/15/22 1:27 PM, John Roesler wrote:
> >> Thanks for the thorough KIP, Victoria!
> >>
> >> I'm +1 (binding)
> >>
> >> -John
> >>
> >> On 2022/12/15 19:56:21 Victoria Xia wrote:
> >>> Hi all,
> >>>
> >>> I'd like to start a vote on KIP-889 for introducing versioned key-value
> >>> state stores to Kafka Streams:
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>
> >>> The discussion thread has been open for a few weeks now and has
> >>> converged
> >>> among the current participants.
> >>>
> >>> Thanks,
> >>> Victoria
> >>>
>


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-21 Thread Sagar
Hi All,

Just as an update, the changes described here:

```
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.
```

are reverted back. As things stand,  KeyQueryMetada exposes only the
partition() method and the restriction for single partition is added back
for IQ. This has been done based on the points raised by Matthias above.

The KIP has been updated accordingly.

Thanks!
Sagar.

On Sat, Dec 10, 2022 at 12:09 AM Sagar  wrote:

> Hey Matthias,
>
> Actually I had shared the PR link for any potential issues that might have
> gone missing. I guess it didn't come out that way in my response. Apologies
> for that!
>
> I am more than happy to incorporate any feedback/changes or address any
> concerns that are still present around this at this point as well.
>
> And I would keep in mind the feedback to provide more time in such a
> scenario.
>
> Thanks!
> Sagar.
>
> On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax  wrote:
>
>> It is what it is.
>>
>> > we did have internal discussions on this
>>
>> We sometimes have the case that a KIP need adjustment as stuff is
>> discovered during coding. And having a discussion on the PR about it is
>> fine. -- However, before the PR gets merge, the KIP change should be
>> announced to verify that nobody has objections to he change, before we
>> carry forward.
>>
>> It's up to the committer who reviews/merges the PR to ensure that this
>> process is followed IMHO. I hope we can do better next time.
>>
>> (And yes, there was the 3.4 release KIP deadline that might explain it,
>> but it seems important that we give enough time is make "tricky" changes
>> and not rush into stuff IMHO.)
>>
>>
>> -Matthias
>>
>>
>> On 12/8/22 7:04 PM, Sagar wrote:
>> > Thanks Matthias,
>> >
>> > Well, as things stand, we did have internal discussions on this and it
>> > seemed ok to open it up for IQ and more importantly not ok to have it
>> > opened up for FK-Join. And more importantly, the PR for this is already
>> > merged and some of these things came up during that. Here's the PR link:
>> > https://github.com/apache/kafka/pull/12803.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> > On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax 
>> wrote:
>> >
>> >> Ah. Missed it as it does not have a nice "code block" similar to
>> >> `StreamPartitioner` changes.
>> >>
>> >> I understand the motivation, but I am wondering if we might head into a
>> >> tricky direction? State stores (at least the built-in ones) and IQ are
>> >> kinda build with the idea to have sharded data and that a multi-cast of
>> >> keys is an anti-pattern?
>> >>
>> >> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
>> >> sure that generalizing the concepts does not cause issues in the
>> future?
>> >>
>> >> Ie, should we claim that the multi-cast feature should be used for
>> >> KStreams only, but not for KTables?
>> >>
>> >> Just want to double check that we are not doing something we regret
>> later.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 12/7/22 6:45 PM, Sagar wrote:
>> >>> Hi Mathias,
>> >>>
>> >>> I did save it. The changes are added under Public Interfaces (Pt#2
>> about
>> >>> enhancing KeyQueryMetadata with partitions method) and
>> >>> throwing IllegalArgumentException when StreamPartitioner#partitions
>> >> method
>> >>> returns multiple partitions for just FK-join instead of the earlier
>> >> decided
>> >>> FK-Join and IQ.
>> >>>
>> >>> The background is that for IQ, if the users have multi casted records
>> to
>> >>> multiple partitions during ingestion but the fetch returns only a
>> single
>> >>> partition, then it would be wrong. That's why the restriction was
>> lifted
>> >>> for IQ and that's the reason KeyQueryMetadata now has another
>> >> partitions()
>> >>> method to signify the same.
>> >>>
>> >>> FK-Join also has a similar case, but while reviewing it was felt that
>> >>> FK-Join on it's own is fairly comp

Re: [ANNOUNCE] New committer: Walker Carlson

2023-01-17 Thread Sagar
Congratulations Walker!

Thanks!
Sagar.

On Wed, Jan 18, 2023 at 9:32 AM Tom Bentley  wrote:

> Congratulations!
>
> On Wed, 18 Jan 2023 at 01:26, John Roesler  wrote:
>
> > Congratulations, Walker!
> > -John
> >
> > On Tue, Jan 17, 2023, at 18:50, Guozhang Wang wrote:
> > > Congrats, Walker!
> > >
> > > On Tue, Jan 17, 2023 at 2:20 PM Chris Egerton  >
> > > wrote:
> > >
> > >> Congrats, Walker!
> > >>
> > >> On Tue, Jan 17, 2023, 17:07 Bill Bejeck 
> > wrote:
> > >>
> > >> > Congratulations, Walker!
> > >> >
> > >> > -Bill
> > >> >
> > >> > On Tue, Jan 17, 2023 at 4:57 PM Matthias J. Sax 
> > >> wrote:
> > >> >
> > >> > > Dear community,
> > >> > >
> > >> > > I am pleased to announce Walker Carlson as a new Kafka committer.
> > >> > >
> > >> > > Walker has been contributing to Apache Kafka since November 2019.
> He
> > >> > > made various contributions including the following KIPs.
> > >> > >
> > >> > > KIP-671: Introduce Kafka Streams Specific Uncaught Exception
> Handler
> > >> > > KIP-696: Update Streams FSM to clarify ERROR state meaning
> > >> > > KIP-715: Expose Committed offset in streams
> > >> > >
> > >> > >
> > >> > > Congratulations Walker and welcome on board!
> > >> > >
> > >> > >
> > >> > > Thanks,
> > >> > >-Matthias (on behalf of the Apache Kafka PMC)
> > >> > >
> > >> >
> > >>
> >
> >
>


Permission to create KIP

2019-12-27 Thread Sagar
Hi,

I have done some work on adding prefix scan for state store and would like
to create a KIP for the same.

Thanks!
Sagar.


Re: Permission to create KIP

2019-12-30 Thread Sagar
Hi,

Its sagarmeansocean

On Sat, Dec 28, 2019 at 12:29 AM Matthias J. Sax 
wrote:

> What is your wiki account id?
>
>
> -Matthias
>
> On 12/27/19 7:12 AM, Sagar wrote:
> > Hi,
> >
> > I have done some work on adding prefix scan for state store and would
> like
> > to create a KIP for the same.
> >
> > Thanks!
> > Sagar.
> >
>
>


Re: [VOTE] KIP-811: Add config min.repartition.purge.interval.ms to Kafka Streams

2022-01-16 Thread Sagar
Hey Nick,

Thanks for the KIP. I am maybe late to the game here, but maybe it might be
beneficial to mention a couple of example scenarios on how the values of
commit.interval.ms and this new config could affect the overall behavior .
Like what happens if the 2 values are very similar v/s if one of them is
very high as compared to the other. Let me know what you think.

Other than that, Im +1 (non-binding).

Thanks!
Sagar.

On Sun, Jan 16, 2022 at 6:17 PM Luke Chen  wrote:

> Hi Nick,
>
> Thanks for the KIP!
> +1 (non-binding)
>
> Luke
>
> On Sat, Jan 15, 2022 at 4:55 AM Matthias J. Sax  wrote:
>
> > +1 (binding)
> >
> > On 1/14/22 06:32, John Roesler wrote:
> > > Thanks for the KIP, Nick!
> > >
> > > +1 (binding)
> > >
> > > -John
> > >
> > > On Fri, Jan 14, 2022, at 07:40, Bruno Cadonna wrote:
> > >> Hi Nick,
> > >>
> > >> Since the title of the KIP slightly changed after the vote was opened
> > >> also the link to the KIP changed as a result. This is should be a
> > >> working link:
> > >>
> > >> https://cwiki.apache.org/confluence/x/JY-kCw
> > >>
> > >> Anyways, Thanks for the KIP!
> > >>
> > >> I am +1 (binding)
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >>
> > >>
> > >> On 12.01.22 16:34, Nick Telford wrote:
> > >>> Hi everyone,
> > >>>
> > >>> I'd like to call a vote to adopt KIP-811: Add config
> > >>> min.repartition.purge.interval.ms to Kafka Streams
> > >>> <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+min.repartition.purge.interval.ms+to+Kafka+Streams
> > >
> > >>> .
> > >>>
> > >>> Regards
> > >>>
> > >>> Nick Telford
> > >>>
> >
>


Re: [VOTE] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-22 Thread Sagar
Hi All,

There is a small update to the KIP whereby the newly introduced metric
*total-bytes
*has been renamed to *input-buffer-bytes-total.*

Thanks!
Sagar.

On Wed, Sep 29, 2021 at 9:57 AM Sagar  wrote:

> We have 3 binding votes: Sophie/Guozhang/Mathias
> and 2 non-binding votes: Josep/Luke and no -1 votes.
>
> Marking this KIP as accepted.
>
> Thanks everyone!
>
> Thanks!
> Sagar.
>
>
>
> On Wed, Sep 29, 2021 at 7:18 AM Matthias J. Sax  wrote:
>
>> +1 (binding)
>>
>> On 9/28/21 10:40 AM, Sagar wrote:
>> > Hi All,
>> >
>> > Bumping this vote thread again!
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 8, 2021 at 1:19 PM Luke Chen  wrote:
>> >
>> >> Thanks for the KIP.
>> >>
>> >> + 1 (non-binding)
>> >>
>> >> Thanks.
>> >> Luke
>> >>
>> >> On Wed, Sep 8, 2021 at 2:48 PM Josep Prat > >
>> >> wrote:
>> >>
>> >>> +1 (non binding).
>> >>>
>> >>> Thanks for the KIP Sagar!
>> >>> ———
>> >>> Josep Prat
>> >>>
>> >>> Aiven Deutschland GmbH
>> >>>
>> >>> Immanuelkirchstraße 26, 10405 Berlin
>> >>>
>> >>> Amtsgericht Charlottenburg, HRB 209739 B
>> >>>
>> >>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> >>>
>> >>> m: +491715557497
>> >>>
>> >>> w: aiven.io
>> >>>
>> >>> e: josep.p...@aiven.io
>> >>>
>> >>> On Wed, Sep 8, 2021, 03:29 Sophie Blee-Goldman
>> >> > >>>>
>> >>> wrote:
>> >>>
>> >>>> +1 (binding)
>> >>>>
>> >>>> Thanks for the KIP!
>> >>>>
>> >>>> -Sophie
>> >>>>
>> >>>> On Tue, Sep 7, 2021 at 1:59 PM Guozhang Wang 
>> >> wrote:
>> >>>>
>> >>>>> Thanks Sagar, +1 from me.
>> >>>>>
>> >>>>>
>> >>>>> Guozhang
>> >>>>>
>> >>>>> On Sat, Sep 4, 2021 at 10:29 AM Sagar 
>> >>> wrote:
>> >>>>>
>> >>>>>> Hi All,
>> >>>>>>
>> >>>>>> I would like to start a vote on the following KIP:
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
>> >>>>>>
>> >>>>>> Thanks!
>> >>>>>> Sagar.
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>> --
>> >>>>> -- Guozhang
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>


  1   2   3   >