Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Randall Hauch
Congratulations, Bruno!

On Tue, Nov 1, 2022 at 11:20 PM Sagar  wrote:

> Congrats Bruno!
>
> Sagar.
>
> On Wed, Nov 2, 2022 at 7:51 AM deng ziming 
> wrote:
>
> > Congrats!
> >
> > --
> > Ziming
> >
> > > On Nov 2, 2022, at 3:36 AM, Guozhang Wang  wrote:
> > >
> > > Hi everyone,
> > >
> > > I'd like to introduce our new Kafka PMC member, Bruno.
> > >
> > > Bruno has been a committer since April. 2021 and has been very active
> in
> > > the community. He's a key contributor to Kafka Streams, and also helped
> > > review a lot of horizontal improvements such as Mockito. It is my
> > pleasure
> > > to announce that Bruno has agreed to join the Kafka PMC.
> > >
> > > Congratulations, Bruno!
> > >
> > > -- Guozhang Wang, on behalf of Apache Kafka PMC
> >
> >
>


Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Sagar
Congrats Bruno!

Sagar.

On Wed, Nov 2, 2022 at 7:51 AM deng ziming  wrote:

> Congrats!
>
> --
> Ziming
>
> > On Nov 2, 2022, at 3:36 AM, Guozhang Wang  wrote:
> >
> > Hi everyone,
> >
> > I'd like to introduce our new Kafka PMC member, Bruno.
> >
> > Bruno has been a committer since April. 2021 and has been very active in
> > the community. He's a key contributor to Kafka Streams, and also helped
> > review a lot of horizontal improvements such as Mockito. It is my
> pleasure
> > to announce that Bruno has agreed to join the Kafka PMC.
> >
> > Congratulations, Bruno!
> >
> > -- Guozhang Wang, on behalf of Apache Kafka PMC
>
>


Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread deng ziming
Congrats!

--
Ziming

> On Nov 2, 2022, at 3:36 AM, Guozhang Wang  wrote:
> 
> Hi everyone,
> 
> I'd like to introduce our new Kafka PMC member, Bruno.
> 
> Bruno has been a committer since April. 2021 and has been very active in
> the community. He's a key contributor to Kafka Streams, and also helped
> review a lot of horizontal improvements such as Mockito. It is my pleasure
> to announce that Bruno has agreed to join the Kafka PMC.
> 
> Congratulations, Bruno!
> 
> -- Guozhang Wang, on behalf of Apache Kafka PMC



Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1333

2022-11-01 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-14347) deleted records may be kept unexpectedly when leader changes while adding a new replica

2022-11-01 Thread Vincent Jiang (Jira)
Vincent Jiang created KAFKA-14347:
-

 Summary: deleted records may be kept unexpectedly when leader 
changes while adding a new replica
 Key: KAFKA-14347
 URL: https://issues.apache.org/jira/browse/KAFKA-14347
 Project: Kafka
  Issue Type: Improvement
Reporter: Vincent Jiang


Consider that in a compacted topic, a regular record _k1=v1_  is deleted by a 
later tombstone record {_}k1=null{_}{_}.{_}  And imagine that somehow __ log 
compaction is making different progress on the three replicas, {_}r1{_}, _r2_ 
and _r3:_
_-_ on replica {_}r1{_}, log compaction has not cleaned _k1=v1_ or _k1=null_ 
yet.
- on replica {_}r2{_}, log compaction cleaned and removed both _k1=v1_ and 
_k1=null._

In this case, following sequence can cause record _k1=v1_ being kept 
unexpectedly:
1.  Replica _r3_ is re-assigned to a different node and starts to replicate 
data from leader. 
2. At the beginning, _r1_ is the leader, so _r3_ replicates record _k1=v1_ from 
{_}r1{_}.
3. Before _k1=null_ is replicated from {_}r1{_}, leader changes to {_}r2{_}.
4. _r3_ replicates data from {_}r2{_}.  Because _k1=null_ record has been 
cleaned in {_}r2{_}, it will not be replicated.

As a result, _r3_ has record _k1=v1_ but not {_}k1=null{_}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Lucas Brutschy
Wow, congratulations!

On Tue, Nov 1, 2022 at 8:55 PM Chris Egerton  wrote:
>
> Congrats!
>
> On Tue, Nov 1, 2022, 15:44 Bill Bejeck  wrote:
>
> > Congrats Bruno! Well deserved.
> >
> > -Bill
> >
> > On Tue, Nov 1, 2022 at 3:36 PM Guozhang Wang  wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to introduce our new Kafka PMC member, Bruno.
> > >
> > > Bruno has been a committer since April. 2021 and has been very active in
> > > the community. He's a key contributor to Kafka Streams, and also helped
> > > review a lot of horizontal improvements such as Mockito. It is my
> > pleasure
> > > to announce that Bruno has agreed to join the Kafka PMC.
> > >
> > > Congratulations, Bruno!
> > >
> > > -- Guozhang Wang, on behalf of Apache Kafka PMC
> > >
> >


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1332

2022-11-01 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-11-01 Thread Akhilesh Chaganti
Hi David,


Thanks for the KIP. I have some questions/suggestions.


1) I see two new metrics:
kafka.controller:type=KafkaController,name=MetadataType and
kafka.controller:type=KafkaController,name=MigrationState. Won't the second
metric already cover the cases of the first metric? Also, instead of
MigrationFinalized, we could directly say the state is KRaftMode. So we can
use the same value for default KRaft clusters.


2) ZkMigrationReady in ApiVersionsResponse from KRaft Controller. By
default, we plan to start the Controller quorum in "
*kafka.metadata.migration.enable*" config set to true. Then do we need this
additional information again to make sure The controllers are ready for
migration? What would happen if the Controller assumes it is ready for
migration from 3.4 by default if it doesn't see both MigrationMetadata
records?


3) I see that we do not impose order on rolling the brokers with migration
flags and provisioning the controller quorum. Along with the KRaft
controller emitting migration state metrics, it may be better to emit the
broker count for the brokers not ready for migration yet. This will give us
more insight into any roll issues.


4) Once the KRaft controller is in migration mode, we should also
prevent/handle ZkBrokerRegistrations that don't enable migration mode.


Thanks
Akhilesh


On Tue, Nov 1, 2022 at 10:49 AM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the reply.
>
> 20/21. Regarding the new ZkMigrationReady field in ApiVersionsResponse, it
> seems that this is a bit intrusive since it exposes unneeded info to the
> clients. Another option is to add that field as part of the Fetch request.
> We can choose to only set that field in the very first Fetch request from a
> Quorum follower.
>
> 40. For kafka.controller:type=KafkaController,name=MigrationState, what is
> the value for a brand new KRaft cluster?
>
> Jun
>
> On Mon, Oct 31, 2022 at 2:35 PM David Arthur
>  wrote:
>
> > 30. I think we can keep the single ControllerId field in those requests
> > since they are only used for fencing (as far as I know). Internally, the
> > broker components that handle those requests will compare the
> ControllerId
> > with that of MetadataCache (which is updated via UMR).
> >
> > The reason we need the separate KRaftControllerId in the UpdateMetadata
> > code path so that we can have different connection behavior for a KRaft
> > controller vs ZK controller.
> >
> > 31. It seems reasonable to keep the MigrationRecord in the snapshot. I
> was
> > thinking the same thing in terms of understanding the loss for a
> > migration-after-finalization. However, once a snapshot has been taken
> that
> > includes the final MigrationRecord, we can't easily see which records
> came
> > after it.
> >
> > 32. You're correct, we can just use the modify time from the Stat. The
> > other two fields are primarily informational and are there for operators
> > who want to inspect the state of the migration. They aren't required for
> > correctness
> >
> > 33. Yes that's right. I detail that in "Controller Leadership" section
> >
> > 34. Right, I'll fix that.
> >
> > Thanks,
> > David
> >
> > On Mon, Oct 31, 2022 at 2:55 PM Jun Rao 
> wrote:
> >
> > > Hi, David,
> > >
> > > Thanks for the updated KIP. A few more comments.
> > >
> > > 30. LeaderAndIsrRequest/StopReplicaRequest both have a controllerId
> > field.
> > > Should we add a KRaftControllerId field like UpdateMetadata?
> > >
> > > 31. "If a migration has been finalized, but the KRaft quroum comes up
> > with
> > > kafka.metadata.migration.enable, we must not re-enter the migration
> mode.
> > > In this case, while replaying the log, the controller can see the
> second
> > > MigrationRecord and know that the migration is finalized and should not
> > be
> > > resumed. " Hmm, do we want to keep the MigrationRecord in the snapshot
> > and
> > > the metadata log forever after migration is finalized? If not, we can't
> > > know for sure whether a migration has happened or not. Also, it might
> be
> > > useful to support switching back to ZK mode after the migration is
> > > finalized, with the understanding of potential metadata loss. In that
> > case,
> > > we could just trim all metadata log and recopy the ZK metadata back.
> > >
> > > 32. The /migration node in ZK: Do we need last_update_time_ms since ZK
> > > Stats already has an MTime? Also, how do we plan to use the
> > > kraft_controller_id and kraft_controller_epoch fields?
> > >
> > > 33. Controller migration: We will force a write to the "/controller"
> and
> > > "/controller_epoch" ZNodes before copying ZK data, right?
> > >
> > > 34. "Operator can remove the persistent "/controller" and
> > > "/controller_epoch" nodes allowing for ZK controller election to take
> > > place". I guess the operator only needs to remove the /controller path?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Oct 31, 2022 at 7:17 AM David Arthur
> > >  wrote:
> > >
> > > > Happy Monday, everyone! I've updated 

Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Chris Egerton
Congrats!

On Tue, Nov 1, 2022, 15:44 Bill Bejeck  wrote:

> Congrats Bruno! Well deserved.
>
> -Bill
>
> On Tue, Nov 1, 2022 at 3:36 PM Guozhang Wang  wrote:
>
> > Hi everyone,
> >
> > I'd like to introduce our new Kafka PMC member, Bruno.
> >
> > Bruno has been a committer since April. 2021 and has been very active in
> > the community. He's a key contributor to Kafka Streams, and also helped
> > review a lot of horizontal improvements such as Mockito. It is my
> pleasure
> > to announce that Bruno has agreed to join the Kafka PMC.
> >
> > Congratulations, Bruno!
> >
> > -- Guozhang Wang, on behalf of Apache Kafka PMC
> >
>


Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Bill Bejeck
Congrats Bruno! Well deserved.

-Bill

On Tue, Nov 1, 2022 at 3:36 PM Guozhang Wang  wrote:

> Hi everyone,
>
> I'd like to introduce our new Kafka PMC member, Bruno.
>
> Bruno has been a committer since April. 2021 and has been very active in
> the community. He's a key contributor to Kafka Streams, and also helped
> review a lot of horizontal improvements such as Mockito. It is my pleasure
> to announce that Bruno has agreed to join the Kafka PMC.
>
> Congratulations, Bruno!
>
> -- Guozhang Wang, on behalf of Apache Kafka PMC
>


[ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Guozhang Wang
Hi everyone,

I'd like to introduce our new Kafka PMC member, Bruno.

Bruno has been a committer since April. 2021 and has been very active in
the community. He's a key contributor to Kafka Streams, and also helped
review a lot of horizontal improvements such as Mockito. It is my pleasure
to announce that Bruno has agreed to join the Kafka PMC.

Congratulations, Bruno!

-- Guozhang Wang, on behalf of Apache Kafka PMC


Re: [DISCUSS] KIP-858: Handle JBOD broker disk failure in KRaft

2022-11-01 Thread Tom Bentley
Hi Igor,

Thanks for the KIP, I've finally managed to take an initial look.

0. You mention the command line tools (which one?) in the public interfaces
section, but don't spell out how it changes -- what options are added.
Reading the proposed changes it suggests that there are no changes to the
supported options and that it is done automatically during initial
formatting based on the broker config. But what about the case where we're
upgrading an existing non-JBOD KRaft cluster where the meta.properties
already exists? Do we just run `./bin/kafka-storage.sh format -c
/tmp/server.properties` again? How would an operator remove an existing log
dir?

1. In the example for the storage format command I think it's worth
explaining it in a little more detail. i.e. that the `directory.ids` has
three items: two for the configured log.dirs and one for the configured
metadata.log.dir.

2. In 'Broker lifecycle management' you presumably want to check that the
directory ids for each log dir are actually unique.

3. I don't understand the motivation for having the controller decide the
log dir for new replicas. I think there are two cases we want to support:
a) Where the user specifies the log dir (likely based on some external
information). This is out of scope for this KIP.
b) If the user didn't specify, isn't the broker in a better position to
decide (for example, based on free storage), and the communicate back to
the controller the log dir that was chosen using the
ASSIGN_REPLICAS_TO_DIRECTORIES RPC?

4. Broker registration. I don't understand the intent behind the
optimization for the single log dir case (last bullet). "Brokers whose
registration indicate that multiple log directories are configured remain
FENCED until all log directory assignments for that broker are learnt by
the active controller and persisted into metadata." is something you've
committed to anyway.

5. AFAICS there's no way for the user to determine via the Kafka protocol
which directory id corresponds to which log dir path. I.e. you've not
changed any of the Admin APIs. Perhaps adding a Future Work section to
spell out the pieces we know are missing would be a good idea?

I would second Jason's idea for piggybacking on- and off-line state changes
on the BrokerHeartbeat RPC. I suspect the complexity of making this
incrementally isn't so great, given that both broker and controller need to
keep track of the on- and off-line directories anyway. i.e. We could add
LogDirsOfflined and LogDirsOnlined fields to both request and response and
have the broker keep including a log dir in requests until acknowledged in
the response, but otherwise they'd be empty.

Thanks again,

Tom

On Tue, 25 Oct 2022 at 19:59, Igor Soarez  wrote:

> Hello,
>
> There’s now a proposal to address ZK to KRaft migration — KIP-866 — but
> currently it doesn't address JBOD so I've decided to update this proposal
> to address that migration scenario.
>
> So given that:
>
> - When migrating from a ZK cluster running JBOD to KRaft, brokers
> registering in KRaft mode will need to be able to register all configured
> log directories.
> - As part of the migration, the mapping of partition to log directory will
> have to be learnt by the active controller and persisted into the cluster
> metadata.
> - It isn’t safe to allow for leadership from replicas without this
> mapping, as if the hosting log directory fails there will be no failover
> mechanism.
>
> I have updated the proposal to reflect that:
>
> - Multiple log directories may be indicated in the first broker
> registration referencing log directory UUIDs. We no longer require a single
> log directory to start with.
> - The controller must never assign leadership to a replica in a broker
> registered with multiple log directories, unless the partition to log
> directory assignment is already in the cluster metadata.
> - The broker should not be unfenced until all of its partition to log
> directory mapping is persisted into cluster metadata
>
> I've also added details as to how the ZK to KRaft migration can work in a
> cluster that is already operating with JBOD.
>
> Please have a look and share your thoughts.
>
> --
> Igor
>
>
>


Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-11-01 Thread Jun Rao
Hi, David,

Thanks for the reply.

20/21. Regarding the new ZkMigrationReady field in ApiVersionsResponse, it
seems that this is a bit intrusive since it exposes unneeded info to the
clients. Another option is to add that field as part of the Fetch request.
We can choose to only set that field in the very first Fetch request from a
Quorum follower.

40. For kafka.controller:type=KafkaController,name=MigrationState, what is
the value for a brand new KRaft cluster?

Jun

On Mon, Oct 31, 2022 at 2:35 PM David Arthur
 wrote:

> 30. I think we can keep the single ControllerId field in those requests
> since they are only used for fencing (as far as I know). Internally, the
> broker components that handle those requests will compare the ControllerId
> with that of MetadataCache (which is updated via UMR).
>
> The reason we need the separate KRaftControllerId in the UpdateMetadata
> code path so that we can have different connection behavior for a KRaft
> controller vs ZK controller.
>
> 31. It seems reasonable to keep the MigrationRecord in the snapshot. I was
> thinking the same thing in terms of understanding the loss for a
> migration-after-finalization. However, once a snapshot has been taken that
> includes the final MigrationRecord, we can't easily see which records came
> after it.
>
> 32. You're correct, we can just use the modify time from the Stat. The
> other two fields are primarily informational and are there for operators
> who want to inspect the state of the migration. They aren't required for
> correctness
>
> 33. Yes that's right. I detail that in "Controller Leadership" section
>
> 34. Right, I'll fix that.
>
> Thanks,
> David
>
> On Mon, Oct 31, 2022 at 2:55 PM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the updated KIP. A few more comments.
> >
> > 30. LeaderAndIsrRequest/StopReplicaRequest both have a controllerId
> field.
> > Should we add a KRaftControllerId field like UpdateMetadata?
> >
> > 31. "If a migration has been finalized, but the KRaft quroum comes up
> with
> > kafka.metadata.migration.enable, we must not re-enter the migration mode.
> > In this case, while replaying the log, the controller can see the second
> > MigrationRecord and know that the migration is finalized and should not
> be
> > resumed. " Hmm, do we want to keep the MigrationRecord in the snapshot
> and
> > the metadata log forever after migration is finalized? If not, we can't
> > know for sure whether a migration has happened or not. Also, it might be
> > useful to support switching back to ZK mode after the migration is
> > finalized, with the understanding of potential metadata loss. In that
> case,
> > we could just trim all metadata log and recopy the ZK metadata back.
> >
> > 32. The /migration node in ZK: Do we need last_update_time_ms since ZK
> > Stats already has an MTime? Also, how do we plan to use the
> > kraft_controller_id and kraft_controller_epoch fields?
> >
> > 33. Controller migration: We will force a write to the "/controller" and
> > "/controller_epoch" ZNodes before copying ZK data, right?
> >
> > 34. "Operator can remove the persistent "/controller" and
> > "/controller_epoch" nodes allowing for ZK controller election to take
> > place". I guess the operator only needs to remove the /controller path?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Oct 31, 2022 at 7:17 AM David Arthur
> >  wrote:
> >
> > > Happy Monday, everyone! I've updated the KIP with the following
> changes:
> > >
> > > * Clarified MetadataType metric usages (broker vs controller)
> > > * Added ZkMigrationReady tagged field to ApiVersionsResponse (for use
> by
> > > KRaft controller quorum)
> > > * Added MigrationRecord with two states: Started and Finished
> > > * Documented ZK configs for KRaft controller
> > > * Simplified state machine description (internally, more states will
> > exist,
> > > but only the four documented are interesting to operators)
> > > * Clarified some things in Controller Migration section
> > > * Removed KRaft -> ZK parts of Broker Registration
> > > * Added Misconfigurations section to Failure Modes
> > >
> > > Let me know if I've missed anything from the past two weeks of
> > discussion.
> > >
> > > Thanks again to everyone who has reviewed this KIP so far!
> > >
> > > -David
> > >
> > > On Fri, Oct 28, 2022 at 2:26 PM Jun Rao 
> > wrote:
> > >
> > > > Hi, David,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 20/21. Sounds good.
> > > >
> > > > Could you update the doc with all the changes being discussed?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Oct 28, 2022 at 10:11 AM David Arthur
> > > >  wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > 20/21. I was also wondering about a "migration" record. In addition
> > to
> > > > the
> > > > > scenario you mentioned, we also need a way to prevent the cluster
> > from
> > > > > re-entering the dual write mode after the migration has been
> > > finalized. I
> > > > > could see this happening inadvertently via 

[jira] [Created] (KAFKA-14346) Remove static methods from internal Connect APIs for easier testing

2022-11-01 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14346:
-

 Summary: Remove static methods from internal Connect APIs for 
easier testing
 Key: KAFKA-14346
 URL: https://issues.apache.org/jira/browse/KAFKA-14346
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton


Our use of static methods for internal APIs such as the [RestClient 
class|https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java]
 and 
[Plugins::compareAndSwapLoaders|https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L123-L129]
 makes testing difficult, especially with the in-progress migration from 
EasyMock/PowerMock to Mockito.

We should remove these static methods and replace them with non-static methods 
that can be more easily mocked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-793: Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

2022-11-01 Thread Chris Egerton
Hi all,

I have more comments I'd like to make on this KIP when I have time (sorry
for the delay, Yash, and thanks for your patience!), but I did want to
chime in and say that I'm also not sure about overloading SinkTask::put. I
share the concerns about creating an intuitive, simple API that Yash has
raised. In addition, this approach doesn't seem very sustainable--what do
we do if we encounter another case in the future that would warrant a
similar solution? We probably don't want to create three, four, etc.
overloaded variants of the method, each of which would have to be
implemented by connector developers who want to both leverage the latest
and greatest connector APIs and maintain compatibility with connect
Clusters running older versions.

I haven't been able to flesh this out into a design worth publishing in its
own KIP yet, but one alternative I've pitched to a few people with
generally positive interest has been to develop an official compatibility
library for Connect developers. This library would be released as its own
Maven artifact (separate from connect-api, connect-runtime, etc.) and would
provide a simple, clean interface for developers to determine which
features are supported by the version of the Connect runtime that their
plugin has been deployed onto. Under the hood, this library might use
reflection to determine whether classes, methods, etc. are available, but
the developer wouldn't have to do anything more than check (for example)
`Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` to know at any point
in the lifetime of their connector/task whether that feature is provided by
the runtime.

One other high-level comment: this doesn't address every case, but we might
consider adding an API to "ack" sink records. This could use the
SubmittedRecords class [1] (with some slight tweaks) under the hood to
track the latest-acked offset for each topic partition. This way, connector
developers won't be responsible for tracking offsets at all in their sink
tasks (eliminating issues with the accuracy of post-transformation T/P/O
sink record information), and they'll only have to notify the Connect
framework when a record has been successfully dispatched to the external
system. This provides a cleaner, friendlier API, and also enables more
fine-grained metrics like the ones proposed in KIP-767 [2].

[1] -
https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
[2] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics

Cheers,

Chris

On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya  wrote:

> Hi Randall,
>
> It's been a while for this one but the more I think about it, the more I
> feel like the current approach with a new overloaded `SinkTask::put` method
> might not be optimal. We're trying to fix a pretty corner case bug here
> (usage of topic mutating SMTs with sink connectors that do their own offset
> tracking) and I'm not sure that warrants a change to such a central
> interface method. The new `SinkTask::put` method just seems somewhat odd
> and it may not be very understandable for a new reader - I don't think this
> should be the case for a public interface method. Furthermore, even with
> elaborate documentation in place, I'm not sure if it'll be very obvious to
> most people what the purpose of having these two `put` methods is and how
> they should be used by sink task implementations. What do you think?
>
> Thanks,
> Yash
>
> On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya  wrote:
>
> > Hi Randall,
> >
> > Thanks a lot for your valuable feedback so far! I've updated the KIP
> based
> > on our discussion above. Could you please take another look?
> >
> > Thanks,
> > Yash
> >
> > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch  wrote:
> >
> >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya 
> wrote:
> >>
> >> > Hi Randall,
> >> >
> >> > Thanks for elaborating. I think these are all very good points and I
> see
> >> > why the overloaded `SinkTask::put` method is a cleaner solution
> overall.
> >> >
> >> > > public void put(Collection records, Map >> > TopicPartition> updatedTopicPartitions)
> >> >
> >> > I think this should be
> >> >
> >> > `public void put(Collection records, Map >> > TopicPartition> originalTopicPartitions)`
> >> >
> >> > instead because the sink records themselves have the updated topic
> >> > partitions (i.e. after all transformations have been applied) and the
> >> KIP
> >> > is proposing a way for the tasks to be able to access the original
> topic
> >> > partition (i.e. before transformations have been applied).
> >> >
> >>
> >> Sounds good.
> >>
> >>
> >> >
> >> > > Of course, if the developer does not need separate methods, they can
> >> > easily have the older `put` method simply delegate to the newer
> method.
> >> >
> >> > If the developer does not need separate methods (i.e. they don't need
> to
> >> > use this new addition), they 

Re: [DISCUSS] KIP-793: Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

2022-11-01 Thread Yash Mayya
Hi Randall,

It's been a while for this one but the more I think about it, the more I
feel like the current approach with a new overloaded `SinkTask::put` method
might not be optimal. We're trying to fix a pretty corner case bug here
(usage of topic mutating SMTs with sink connectors that do their own offset
tracking) and I'm not sure that warrants a change to such a central
interface method. The new `SinkTask::put` method just seems somewhat odd
and it may not be very understandable for a new reader - I don't think this
should be the case for a public interface method. Furthermore, even with
elaborate documentation in place, I'm not sure if it'll be very obvious to
most people what the purpose of having these two `put` methods is and how
they should be used by sink task implementations. What do you think?

Thanks,
Yash

On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya  wrote:

> Hi Randall,
>
> Thanks a lot for your valuable feedback so far! I've updated the KIP based
> on our discussion above. Could you please take another look?
>
> Thanks,
> Yash
>
> On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch  wrote:
>
>> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya  wrote:
>>
>> > Hi Randall,
>> >
>> > Thanks for elaborating. I think these are all very good points and I see
>> > why the overloaded `SinkTask::put` method is a cleaner solution overall.
>> >
>> > > public void put(Collection records, Map> > TopicPartition> updatedTopicPartitions)
>> >
>> > I think this should be
>> >
>> > `public void put(Collection records, Map> > TopicPartition> originalTopicPartitions)`
>> >
>> > instead because the sink records themselves have the updated topic
>> > partitions (i.e. after all transformations have been applied) and the
>> KIP
>> > is proposing a way for the tasks to be able to access the original topic
>> > partition (i.e. before transformations have been applied).
>> >
>>
>> Sounds good.
>>
>>
>> >
>> > > Of course, if the developer does not need separate methods, they can
>> > easily have the older `put` method simply delegate to the newer method.
>> >
>> > If the developer does not need separate methods (i.e. they don't need to
>> > use this new addition), they can simply continue implementing just the
>> > older `put` method right?
>> >
>>
>> Correct. We should update the JavaDoc of both methods to make this clear,
>> and in general how the two methods should are used and should be
>> implemented. That can be part of the PR, and the KIP doesn't need this
>> wording.
>>
>> >
>> > > Finally, this gives us a roadmap for *eventually* deprecating the
>> older
>> > method, once the Connect runtime versions without this change are old
>> > enough.
>> >
>> > I'm not sure we'd ever want to deprecate the older method. Most common
>> sink
>> > connector implementations do not do their own offset tracking with
>> > asynchronous processing and will probably never have a need for the
>> > additional parameter `Map
>> > originalTopicPartitions` in the proposed new `put` method. These
>> connectors
>> > can continue implementing only the existing `SinkTask::put` method which
>> > will be called by the default implementation of the newer overloaded
>> `put`
>> > method.
>> >
>>
>> +1
>>
>>
>> >
>> > > the pre-commit methods use the same `Map> > OffsetAndMetadata> currentOffsets` data structure I'm suggesting be
>> used.
>> >
>> > The data structure you're suggesting be used is a `Map> > TopicPartition>` which will map `SinkRecord` objects to the original
>> topic
>> > partition of the corresponding `ConsumerRecord` right? To clarify, this
>> is
>> > a new data structure that will need to be managed in the
>> `WorkerSinkTask`.
>> >
>>
>> Ah, you're right. Thanks for the correction.
>>
>> Best regards,
>> Randall
>>
>>
>> > Thanks,
>> > Yash
>>
>>
>> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch  wrote:
>> >
>> > > Hi, Yash.
>> > >
>> > > I'm not sure I quite understand why it would be "easier" for connector
>> > > > developers to account for implementing two different overloaded
>> `put`
>> > > > methods (assuming that they want to use this new feature) versus
>> using
>> > a
>> > > > try-catch block around `SinkRecord` access methods?
>> > >
>> > >
>> > > Using a try-catch to try around an API method that *might* be there
>> is a
>> > > very unusual thing for most developers. Unfortunately, we've had to
>> > resort
>> > > to this atypical approach with Connect in places when there was no
>> good
>> > > alternative. We seem to relying upon pattern because it's easier for
>> us,
>> > > not because it offers a better experience for Connector developers.
>> IMO,
>> > if
>> > > there's a practical alternative that uses normal development practices
>> > and
>> > > techniques, then we should use that alternative. IIUC, there is at
>> least
>> > > one practical alternative for this KIP that would not require
>> developers
>> > to
>> > > use the unusual try-catch to handle the case where methods are not
>> found.
>> > >
>> > > I also think 

Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2022-11-01 Thread Lucas Brutschy
We need this!

+ 1 non binding

Cheers,
Lucas

On Tue, Nov 1, 2022 at 10:01 AM Bruno Cadonna  wrote:
>
> Guozhang,
>
> Thanks for the KIP!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 25.10.22 22:07, Walker Carlson wrote:
> > +1 non binding
> >
> > Thanks for the kip!
> >
> > On Thu, Oct 20, 2022 at 10:25 PM John Roesler  wrote:
> >
> >> Thanks for the KIP, Guozhang!
> >>
> >> I'm +1 (binding)
> >>
> >> -John
> >>
> >> On Wed, Oct 12, 2022, at 16:36, Nick Telford wrote:
> >>> Can't wait!
> >>> +1 (non-binding)
> >>>
> >>> On Wed, 12 Oct 2022, 18:02 Guozhang Wang, 
> >>> wrote:
> >>>
>  Hello all,
> 
>  I'd like to start a vote for the following KIP, aiming to improve Kafka
>  Stream's restoration visibility via new metrics and callback methods:
> 
> 
> 
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility
> 
> 
>  Thanks!
>  -- Guozhang
> 
> >>
> >


[VOTE] KIP-852: Optimize calculation of size for log in remote tier

2022-11-01 Thread Divij Vaidya
Hey folks

The discuss thread for this KIP has been open for a few months with no
concerns being surfaced. I would like to start a vote for the
implementation of this KIP.

The KIP is available at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier


Regards
Divij Vaidya


Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2022-11-01 Thread Bruno Cadonna

Guozhang,

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 25.10.22 22:07, Walker Carlson wrote:

+1 non binding

Thanks for the kip!

On Thu, Oct 20, 2022 at 10:25 PM John Roesler  wrote:


Thanks for the KIP, Guozhang!

I'm +1 (binding)

-John

On Wed, Oct 12, 2022, at 16:36, Nick Telford wrote:

Can't wait!
+1 (non-binding)

On Wed, 12 Oct 2022, 18:02 Guozhang Wang, 
wrote:


Hello all,

I'd like to start a vote for the following KIP, aiming to improve Kafka
Stream's restoration visibility via new metrics and callback methods:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



Thanks!
-- Guozhang







Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-11-01 Thread Luke Chen
Hi Sophie,

Thanks for the KIP. A very useful proposal!
Some questions:

1. the staticPartition method in the interface is commented out.

2. For error handling, as you can imagine, there could be errors happening
during partition expansion.That means, the operation would be (1) take long
time to complete, or (2) get stuck somewhere with fatal errorI'd like to
know how we handle these 2 situations? For (1) I'm thinking if we should
expose some metrics for monitoring, ex: state, topics to be autoscaled, ...
etc. For (2), I'm not sure if some partitions got expanded and some not
will cause any weird issues. If no, maybe just expose a metric for
autoscaling state, and have a state said "failed" something like that

3. Could this operation get aborted? I don't think so. Maybe there should
be a note in the KIP

Thank you.
Luke


On Tue, Nov 1, 2022 at 2:15 AM Bruno Cadonna  wrote:

> Hi Sophie,
>
> Thank you for the KIP!
>
> 1.
> I do not understand how autoscaling should work with a Streams topology
> with a stateful sub-topology that reads from the input topics. The
> simplest example is a topology that consists of only one stateful
> sub-topology. As far as I understand the upstream producer would route
> existing keys to different partitions after the partition expansion than
> before the expansion. That means Streams would -- in general -- not read
> the same keys with the same stream thread after the expansion. I think
> you proposed the solution to this in your last e-mail with the following:
>
> 
> Essentially whoever is responsible for calculating how many partitions
> are needed should also be responsible for directing whichever new keys
> are supposed to go into those new partitions, then pass it along to the
> upstream producer to encode in the record itself.
> 
>
> But I am not 100% sure if you really meant what I understand. If I
> understand it correctly, you propose that the user is responsible to
> produce the records with existing keys to the same partitions as before
> the expansion upstream. I think that is an important information that
> should be pointed out in the KIP.
>
>
> 2.
> I would log an error and shutdown the Streams application if a custom
> partitioner is used anywhere in the topology. I think that would make
> the limitations clearer and would reduce perceived unexpected behavior
> by the users. Are there any specific reasons you propose to ignore it
> and log a warning?
>
> Best,
> Bruno
>
> On 28.10.22 04:51, Sophie Blee-Goldman wrote:
> > Thanks all! I'll try to address everything but don't hesitate to call me
> > out if anything is missed
> >
> > Colt/Lucas:
> >
> > Thanks for clarifying, I think I understand your example now. Something I
> > didn't think to mention
> > earlier but hopefully clears up how this would be used in practice is
> that
> > the partitioning decision/
> > logic doesn't need to -- and perhaps explicitly should not be -- internal
> > to the StaticStreamPartitioner
> > interface alone. I would imagine a realistic scenario would have the
> > partition essentially determined
> > upstream of the actual application, specifically integrated with whatever
> > system (or person) is
> > making the decision to add new partition(s) in the first place. Then the
> > partitioner is just reading out
> > some field in the record key/value, possibly doing some translation to
> > derive the final partition number
> > from something like a userId if it's not encoded directly, and not
> actually
> > computing anything itself.
> > Does that make sense? Essentially whoever is responsible for calculating
> > how many partitions are
> > needed should also be responsible for directing whichever new keys are
> > supposed to go into those
> > new partitions, then pass it along to the upstream producer to encode in
> > the record itself.
> >
> > In sum, I second what Lucas said about your scenario actually being a
> good
> > example of one way
> > to approach implementing static partitioning, ie based on time. It's just
> > that the semantics/logic to
> > interpret the target partition based on time would be external to the
> > application and not isolated in
> > the actual StaticStreamPartitioner class. Imo this makes perfect sense,
> as
> > something like IQ is
> > also going to be situated outside of the Streams application itself, so
> > presumably it can talk to
> > the system that is responsible for the partitioning logic for any
> partition
> > information it needs.
> >
> > Bill/Sagar:
> >
> > I've been going back and forth a lot on whether to open this feature up
> to
> > stateless applications or
> > even stateful ones as well, but feel like I've settled on having it
> > targeted towards both (but only) the
> > stateless and statically partitioned cases. Bill, my only concern about
> the
> > stateless apps was the
> > possibility for trouble when repartitioning a stateless application that
> > feeds into a stateful application
> > downstream. But now that I think about