Re: [VOTE] KIP-884: Add config to configure KafkaClientSupplier in Kafka Streams

2022-11-22 Thread Bruno Cadonna

Hi Hao,

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 22.11.22 10:08, Sophie Blee-Goldman wrote:

Hey Hao, thanks for the KIP -- I'm +1 (binding)

On Mon, Nov 21, 2022 at 12:57 PM Matthias J. Sax  wrote:


+1 (binding)

On 11/21/22 7:39 AM, John Roesler wrote:

I'm +1 (binding)

Thanks for the KIP!
-John

On 2022/11/17 21:06:29 Hao Li wrote:

Hi all,

I would like start a vote on KIP-884:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams



Thanks,
Hao







Re: [DISCUSS] KIP-890 Server Side Defense

2022-11-22 Thread Jeff Kim
Hi Justine,

Thanks for the KIP! I have two questions:

1) For new clients, we can once again return an error UNKNOWN_PRODUCER_ID
for sequences
that are non-zero when there is no producer state present on the server.
This will indicate we missed the 0 sequence and we don't yet want to write
to the log.

I would like to understand the current behavior to handle older clients,
and if there are any changes we are making. Maybe I'm missing something,
but we would want to identify whether we missed the 0 sequence for older
clients, no?

2) Upon returning from the transaction coordinator, we can set the
transaction
as ongoing on the leader by populating currentTxnFirstOffset
through the typical produce request handling.

does the typical produce request path append records to local log along
with the currentTxnFirstOffset information? I would like to understand
when the field is written to disk.

Thanks,
Jeff


On Tue, Nov 22, 2022 at 4:44 PM Artem Livshits
 wrote:

> Hi Justine,
>
> Thank you for the KIP.  I have one question.
>
> 5) For new clients, we can once again return an error UNKNOWN_PRODUCER_ID
>
> I believe we had problems in the past with returning UNKNOWN_PRODUCER_ID
> because it was considered fatal and required client restart.  It would be
> good to spell out the new client behavior when it receives the error.
>
> -Artem
>
> On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan
>  wrote:
>
> > Thanks for taking a look Matthias. I've tried to answer your questions
> > below:
> >
> > 10)
> >
> > Right — so the hanging transaction only occurs when we have a late
> message
> > come in and the partition is never added to a transaction again. If we
> > never add the partition to a transaction, we will never write a marker
> and
> > never advance the LSO.
> >
> > If we do end up adding the partition to the transaction (I suppose this
> can
> > happen before or after the late message comes in) then we will include
> the
> > late message in the next (incorrect) transaction.
> >
> > So perhaps it is clearer to make the distinction between messages that
> > eventually get added to the transaction (but the wrong one) or messages
> > that never get added and become hanging.
> >
> >
> > 20)
> >
> > The client side change for 2 is removing the addPartitions to transaction
> > call. We don't need to make this from the producer to the txn
> coordinator,
> > only server side.
> >
> >
> > In my opinion, the issue with the addPartitionsToTxn call for older
> clients
> > is that we don't have the epoch bump, so we don't know if the message
> > belongs to the previous transaction or this one. We need to check if the
> > partition has been added to this transaction. Of course, this means we
> > won't completely cover the case where we have a really late message and
> we
> > have added the partition to the new transaction, but that's unfortunately
> > something we will need the new clients to cover.
> >
> >
> > 30)
> >
> > Transaction is ongoing = partition was added to transaction via
> > addPartitionsToTxn. We check this with the DescribeTransactions call. Let
> > me know if this wasn't sufficiently explained here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)
> >
> >
> > 40)
> >
> > The idea here is that if any messages somehow come in before we get the
> new
> > epoch to the producer, they will be fenced. However, if we don't think
> this
> > is necessary, it can be discussed
> >
> >
> > 50)
> >
> > It should be synchronous because if we have an event (ie, an error) that
> > causes us to need to abort the transaction, we need to know which
> > partitions to send transaction markers to. We know the partitions because
> > we added them to the coordinator via the addPartitionsToTxn call.
> > Previously we have had asynchronous calls in the past (ie, writing the
> > commit markers when the transaction is completed) but often this just
> > causes confusion as we need to wait for some operations to complete. In
> the
> > writing commit markers case, clients often see CONCURRENT_TRANSACTIONs
> > error messages and that can be confusing. For that reason, it may be
> > simpler to just have synchronous calls — especially if we need to block
> on
> > some operation's completion anyway before we can start the next
> > transaction. And yes, I meant coordinator. I will fix that.
> >
> >
> > 60)
> >
> > When we are checking if the transaction is ongoing, we need to make a
> round
> > trip from the leader partition to the transaction coordinator. In the
> time
> > we are waiting for this message to come back, in theory we could have
> sent
> > a commit/abort call that would make the original result of the check out
> of
> > date. That is why we can check the leader state before we write to the
> log.
> >
> >
> > I'm happy to update the KIP if some of these things were not clear.
> > Thanks,
> > Justine

[jira] [Created] (KAFKA-14417) Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest

2022-11-22 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14417:
--

 Summary: Producer doesn't handle REQUEST_TIMED_OUT for 
InitProducerIdRequest
 Key: KAFKA-14417
 URL: https://issues.apache.org/jira/browse/KAFKA-14417
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.3.0, 3.2.0, 3.0.0, 3.1.0
Reporter: Justine Olshan


In TransactionManager we have a handler for InitProducerIdRequests 
[https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#LL1276C14-L1276C14]

However, we have the potential to return a REQUEST_TIMED_OUT error in 
ProducerIdManager when the BrokerToControllerChannel manager times out: 
[https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L236]
 

or when the poll returns null: 
[https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L170]

Since REQUEST_TIMED_OUT is not handled by the producer, we treat it as a fatal 
error. With the default of idempotent producers, this can cause more issues.

Seems like the commit that introduced the changes was this one: 
[https://github.com/apache/kafka/commit/72d108274c98dca44514007254552481c731c958]
 so we are vulnerable when the server code is ibp 3.0 and beyond.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-890 Server Side Defense

2022-11-22 Thread Artem Livshits
Hi Justine,

Thank you for the KIP.  I have one question.

5) For new clients, we can once again return an error UNKNOWN_PRODUCER_ID

I believe we had problems in the past with returning UNKNOWN_PRODUCER_ID
because it was considered fatal and required client restart.  It would be
good to spell out the new client behavior when it receives the error.

-Artem

On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan
 wrote:

> Thanks for taking a look Matthias. I've tried to answer your questions
> below:
>
> 10)
>
> Right — so the hanging transaction only occurs when we have a late message
> come in and the partition is never added to a transaction again. If we
> never add the partition to a transaction, we will never write a marker and
> never advance the LSO.
>
> If we do end up adding the partition to the transaction (I suppose this can
> happen before or after the late message comes in) then we will include the
> late message in the next (incorrect) transaction.
>
> So perhaps it is clearer to make the distinction between messages that
> eventually get added to the transaction (but the wrong one) or messages
> that never get added and become hanging.
>
>
> 20)
>
> The client side change for 2 is removing the addPartitions to transaction
> call. We don't need to make this from the producer to the txn coordinator,
> only server side.
>
>
> In my opinion, the issue with the addPartitionsToTxn call for older clients
> is that we don't have the epoch bump, so we don't know if the message
> belongs to the previous transaction or this one. We need to check if the
> partition has been added to this transaction. Of course, this means we
> won't completely cover the case where we have a really late message and we
> have added the partition to the new transaction, but that's unfortunately
> something we will need the new clients to cover.
>
>
> 30)
>
> Transaction is ongoing = partition was added to transaction via
> addPartitionsToTxn. We check this with the DescribeTransactions call. Let
> me know if this wasn't sufficiently explained here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)
>
>
> 40)
>
> The idea here is that if any messages somehow come in before we get the new
> epoch to the producer, they will be fenced. However, if we don't think this
> is necessary, it can be discussed
>
>
> 50)
>
> It should be synchronous because if we have an event (ie, an error) that
> causes us to need to abort the transaction, we need to know which
> partitions to send transaction markers to. We know the partitions because
> we added them to the coordinator via the addPartitionsToTxn call.
> Previously we have had asynchronous calls in the past (ie, writing the
> commit markers when the transaction is completed) but often this just
> causes confusion as we need to wait for some operations to complete. In the
> writing commit markers case, clients often see CONCURRENT_TRANSACTIONs
> error messages and that can be confusing. For that reason, it may be
> simpler to just have synchronous calls — especially if we need to block on
> some operation's completion anyway before we can start the next
> transaction. And yes, I meant coordinator. I will fix that.
>
>
> 60)
>
> When we are checking if the transaction is ongoing, we need to make a round
> trip from the leader partition to the transaction coordinator. In the time
> we are waiting for this message to come back, in theory we could have sent
> a commit/abort call that would make the original result of the check out of
> date. That is why we can check the leader state before we write to the log.
>
>
> I'm happy to update the KIP if some of these things were not clear.
> Thanks,
> Justine
>
> On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax  wrote:
>
> > Thanks for the KIP.
> >
> > Couple of clarification questions (I am not a broker expert do maybe
> > some question are obvious for others, but not for me with my lack of
> > broker knowledge).
> >
> >
> >
> > (10)
> >
> > > The delayed message case can also violate EOS if the delayed message
> > comes in after the next addPartitionsToTxn request comes in. Effectively
> we
> > may see a message from a previous (aborted) transaction become part of
> the
> > next transaction.
> >
> > What happens if the message come in before the next addPartitionsToTxn
> > request? It seems the broker hosting the data partitions won't know
> > anything about it and append it to the partition, too? What is the
> > difference between both cases?
> >
> > Also, it seems a TX would only hang, if there is no following TX that is
> > either committer or aborted? Thus, for the case above, the TX might
> > actually not hang (of course, we might get an EOS violation if the first
> > TX was aborted and the second committed, or the other way around).
> >
> >
> > (20)
> >
> > > Of course, 1 and 2 require client-side changes, so for older 

Re: [VOTE] KIP-866 ZooKeeper to KRaft Migration

2022-11-22 Thread Jason Gustafson
Thanks, +1 from me. I suspect we might be in for at least one surprise with
the re-implemented controller RPCs, but I agree the alternative has risks
as well.

Best,
Jason

On Mon, Nov 14, 2022 at 12:00 PM Colin McCabe  wrote:

> On Fri, Nov 11, 2022, at 08:59, David Arthur wrote:
> > Thanks, Colin.
> >
> >> never start an upgrade without first verifying the quorum configuration
> on the ZK-based brokers
> >
> > I agree that this is a pretty big benefit. I could imagine debugging
> > and fixing connection problems mid-migration would be a big pain.
> > Especially if you had some brokers correctly configured, and others
> > not.
> >
> > Adding a heartbeat raises some questions about what to do if a broker
> > goes into a bad state, or stops heartbeating, during a migration.
> > However, I think the same is true for a registration based approach,
> > so maybe it's not an increase in net complexity.
> >
>
> Hi David,
>
> Yeah. I think the goal should be for the set of heartbeaters to match the
> set of broker registrations under /brokers
>
> Obviously, people could add or remove brokers after the upgrade has begun,
> but that's unavoidable, I think. We can at least ensure that at the time we
> enter upgrade, all the brokers are ready.
>
> > I've replaced the ZK registration section with a new RPC and brief
> > description. Please take a look.
> >
>
> Thanks, David. With these changes it LGTM to me.
>
> +1 (binding)
>
> Colin
>
> > Thanks!
> > David
> >
> > On Wed, Nov 9, 2022 at 5:46 PM Colin McCabe  wrote:
> >>
> >> Hi David,
> >>
> >> Thanks for the response. Replies inline.
> >>
> >> On Wed, Nov 9, 2022, at 08:17, David Arthur wrote:
> >> > Colin
> >> >
> >> >>  Maybe zk.metadata.migration.enable ?
> >> >
> >> > Done. I went with "zookeeper.metadata.migration.enable" since our
> >> > other ZK configs start with "zookeeper.*"
> >> >
> >> >> SImilarly, for MigrationRecord: can we rename this to
> ZkMigrationStateRecord? Then change MigrationState -> ZkMigrationState.
> >> >
> >> > Sure
> >> >
> >> >> With ZkMigrationStateRecord, one thing to keep in mind here is that
> we will eventually compact all the metadata logs into a snapshot. That
> snapshot will then have to keep alive the memory of the old migration. So
> it is not really a matter of replaying the old metadata logs (probably) but
> a matter of checking to see what the ZkMigrationState is, which I suppose
> could be Optional. If it's not Optional.empty, we already
> migrated / are migrating.
> >> >
> >> > Yea, makes sense.
> >> >
> >> >> For the /migration ZNode, is "last_update_time_ms" necessary? I
> thought ZK already tracked this information in the mzxid of the znode?
> >> >
> >> > Yes, Jun pointed this out previously, I missed this update in the KIP.
> >> > Fixed now.
> >> >
> >> >> It is true that technically it is only needed in UMR, but I would
> still suggest including KRaftControllerId in LeaderAndIsrRequest because it
> will make debugging much easier.
> >> >>
> >> >> I would suggest not implementing the topic deletion state machine,
> but just deleting topics eagerly when in migration mode. We can implement
> this behavior change by keying off of whether KRaftControllerId is present
> in LeaderAndIsrRequest. On broker startup, we'll be sent a full
> LeaderAndIsrRequest and can delete stray partitions whose IDs are not as
> expected (again, this behavior change would only be for migration mode)
> >> >
> >> > Sounds good to me. Since this is somewhat of an implementation detail,
> >> > do you think we need this included in the KIP?
> >>
> >> Yeah, maybe we don't need to go into the delete behavior here. But I
> think the KIP should specify that we have KRaftControllerId in both
> LeaderAndIsrRequest. That will allow us to implement this behavior
> conditionally on zk-based brokers when in dual write mode.
> >>
> >> >
> >> >> For existing KRaft controllers, will
> kafka.controller:type=KafkaController,name=MigrationState show up as 4
> (MigrationFinalized)? I assume this is true, but it would be good to spell
> it out. Sorry if this is answered somewhere else.
> >> >
> >> > We discussed using 0 (None) as the value to report for original,
> >> > un-migrated KRaft clusters. 4 (MigrationFinalized) would be for
> >> > clusters which underwent a migration. I have some description of this
> >> > in the table under "Migration Overview"
> >> >
> >>
> >> I don't feel that strongly about this, but wouldn't it be a good idea
> for MigrationState to have a different value for ZK-based clusters and
> KRaft-based clusters? If you have a bunch of clusters and you take an
> aggregate of this metric, it would be good to get a report of three numbers:
> >> 1. unupgraded ZK
> >> 2. in progress upgrades
> >> 3. kraft
> >>
> >> I guess we could get that from examining some other metrics too,
> though. Not sure, what do you think?
> >>
> >> >> As you point out, the ZK brokers being upgraded will need to contact
> the KRaft quorum in order to forward requests to 

Re: [DISCUSS] KIP-881: Rack-aware Partition Assignment for Kafka Consumers

2022-11-22 Thread Artem Livshits
Hi Rajini,

Thank you for the KIP, the KIP looks good to match RackAwareReplicaSelector
behavior that is available out-of-box.  Which should probably be good
enough in practice.

>From the design perspective, though, RackAwareReplicaSelector is just one
possible plugin, in theory the broker could use a plugin that leverages
networking information to get client locality or some other way, so it
seems like we're making an assumption about broker replica selection in the
default assignment implementation.  So I wonder if we should use a separate
plugin that would be set when RackAwareReplicaSelector is set, rather than
assume broker behavior in the client implementation.

-Artem

On Wed, Nov 16, 2022 at 8:08 AM Jun Rao  wrote:

> Hi, David and Rajini,
>
> Thanks for the explanation. It makes sense to me now.
>
> Jun
>
> On Wed, Nov 16, 2022 at 1:44 AM Rajini Sivaram 
> wrote:
>
> > Thanks David, that was my understanding as well.
> >
> > Regards,
> >
> > Rajini
> >
> > On Wed, Nov 16, 2022 at 8:08 AM David Jacot  >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > We don't need to bump any RPC requests. The subscription is serialized
> > > (including its version) and included as bytes in the RPCs.
> > >
> > > Best,
> > > David
> > >
> > > On Tue, Nov 15, 2022 at 11:42 PM Jun Rao 
> > wrote:
> > > >
> > > > Hi, Rajini,
> > > >
> > > > Thanks for the updated KIP. Just another minor comment. It would be
> > > useful
> > > > to list all RPC requests whose version needs to be bumped because of
> > the
> > > > changes in ConsumerProtocolSubscription.
> > > >
> > > > Jun
> > > >
> > > > On Tue, Nov 15, 2022 at 3:45 AM Rajini Sivaram <
> > rajinisiva...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi David,
> > > > >
> > > > > Sorry, I was out of office and hence the delay in responding.
> Thanks
> > > for
> > > > > reviewing the KIP and answering Viktor's question (thanks for the
> > > review,
> > > > > Viktor).
> > > > >
> > > > > Responses below:
> > > > > 01)  I was in two minds about adding new assignors, because as you
> > > said,
> > > > > user experience is better if assignors used racks when available.
> But
> > > I was
> > > > > a bit concerned about changing the algorithm in existing
> applications
> > > which
> > > > > were already configuring `client.rack`. It felt less risky to add
> new
> > > > > assignor implementations instead. But we can retain existing logic
> if
> > > a)
> > > > > rack information is not available and b) racks have all partitions.
> > So
> > > the
> > > > > only case where logic will be different is when rack information is
> > > > > available because consumers chose to use `client.rack` to benefit
> > from
> > > > > improved locality, but racks only have a subset of partitions. It
> > seems
> > > > > reasonable to make existing assignors rack-aware in this case to
> > > improve
> > > > > locality. I have updated the KIP. Will wait and see if there are
> any
> > > > > objections to this change.
> > > > >
> > > > > 02) Updated 1), so existing assignor classes will be used.
> > > > >
> > > > > 03) Updated the KIP to use version 3, thanks.
> > > > >
> > > > > If there are no concerns or further comments, I will start voting
> > later
> > > > > today.
> > > > >
> > > > > Thank you,
> > > > >
> > > > > Rajini
> > > > >
> > > > >
> > > > > On Fri, Nov 4, 2022 at 9:58 AM David Jacot
> >  > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Viktor,
> > > > > >
> > > > > > I can actually answer your question. KIP-848 already includes
> rack
> > > > > > awareness in the protocol. It is actually the other way around,
> > this
> > > > > > KIP takes the idea from KIP-848 to implement it in the current
> > > > > > protocol in order to realize the benefits sooner. The new
> protocol
> > > > > > will take a while to be implemented.
> > > > > >
> > > > > > Best,
> > > > > > David
> > > > > >
> > > > > > On Fri, Nov 4, 2022 at 10:55 AM David Jacot  >
> > > wrote:
> > > > > > >
> > > > > > > Hi Rajini,
> > > > > > >
> > > > > > > Thanks for the KIP. I have a few questions/comments:
> > > > > > >
> > > > > > > 01. If I understood correctly, the plan is to add new assignors
> > > which
> > > > > > > are rack aware. Is this right? I wonder if it is a judicious
> > choice
> > > > > > > here. The main drawback is that clients must be configured
> > > correctly
> > > > > > > in order to get the benefits. From a user experience
> perspective,
> > > it
> > > > > > > would be much better if we would only require our users to set
> > > > > > > client.rack. However, I understand the argument of keeping the
> > > > > > > existing assignors as-is in order to limit the risk but it also
> > > means
> > > > > > > that we will have to maintain multiple assignors with a
> somewhat
> > > > > > > similar core logic (within a rack). What do you think?
> > > > > > >
> > > > > > > 02. If we proceed with new rack-aware assignors, we should
> > mention
> > > > > > > their fully qualified names in the KIP as they will become part
> > of
> 

[jira] [Created] (KAFKA-14416) org.apache.kafka.common.config.internals classes should be relocated

2022-11-22 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14416:
---

 Summary: org.apache.kafka.common.config.internals classes should 
be relocated
 Key: KAFKA-14416
 URL: https://issues.apache.org/jira/browse/KAFKA-14416
 Project: Kafka
  Issue Type: Task
Reporter: Greg Harris


The classes in `org.apache.kafka.common.config.internals`, currently 
BrokerSecurityConfigs and QuotaConfigs, both contain configuration properties 
that are used in the Kafka Server.

This clashes with the more general purpose of the 
`org.apache.kafka.common.config` package, which is defining the mechanisms for 
defining and parsing configurations. The `internals` subpackage so far appears 
to refer to implementation details of the containing package. The `internals` 
package in this case is just a consumer of the `config` package's API, and 
should be semantically co-located closer to the application which is requiring 
these configurations to be defined.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14176) Reduce visibility of WorkerConfig.lookupKafkaClusterId

2022-11-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-14176.
-
Resolution: Fixed

> Reduce visibility of WorkerConfig.lookupKafkaClusterId
> --
>
> Key: KAFKA-14176
> URL: https://issues.apache.org/jira/browse/KAFKA-14176
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Greg Harris
>Priority: Trivial
>
> Following [KAFKA-14160|https://github.com/apache/kafka/pull/12536#top] we can 
> reduce the visibility of WorkerConfig.lookupKafkaClusterId. Once all callers 
> have been updated to use Mockito we can replace them by calls to 
> WorkerConfig.kafkaClusterId.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Snehashis
Thanks for the points Sagar.

> 1) Should we update the GET /connectors endpoint to include the version of
> the plugin that is running? It could be useful to figure out the version
of
> the plugin or I am assuming it gets returned by the expand=info call?

I think this is good to have and possible future enhancement. The version
info will be present in the config of the connector if the user has
specified the version. Otherwise it is the latest version which the user
can find out from the connector-plugin endpoint. The information can be
introduced to the response of the GET /connectors endpoint itself, however
the most ideal way of doing this would be to get the currently running
instance of the connector and get the version directly from there. This is
slightly tricky as the connector could be running in a different node.
One way to do this would be to persist the version information in the
status backing store during instantiation of the connector. It requires
some more thought and since the version is part of the configs if provided
and evident otherwise, I have not included it in this KIP.

> 2) I am not aware of this and hence asking, can 2 connectors with
different
> versions have the same name? Does the plugin isolation allow this? This
> could have a bearing when using the lifecycle endpoints for connectors
like
> DELETE etc.

All connectors in a cluster need to have uniquire connector names
regardless of what version of the plugin the connector is running
underneath. This is something enforced by the connect runtime itself. All
connect CRUD operations are keyed on the connector name so there will not
be an issue.

Regards
Snehashis

On Tue, Nov 22, 2022 at 3:16 PM Sagar  wrote:

> Hey Snehashsih,
>
> Thanks for the KIP. It looks like a very useful feature. Couple of
> small-ish points, let me know what you think:
>
> 1) Should we update the GET /connectors endpoint to include the version of
> the plugin that is running? It could be useful to figure out the version of
> the plugin or I am assuming it gets returned by the expand=info call?
> 2) I am not aware of this and hence asking, can 2 connectors with different
> versions have the same name? Does the plugin isolation allow this? This
> could have a bearing when using the lifecycle endpoints for connectors like
> DELETE etc.
>
> Thanks!
> Sagar.
>
>
> On Tue, Nov 22, 2022 at 2:10 PM Ashwin 
> wrote:
>
> > Hi Snehasis,
> >
> > > IIUC (please correct me if I am wrong here), what you highlighted
> above,
> > is
> > a versioning scheme for a connector config for the same connector (and
> not
> > different versions of a connector plugin).
> >
> > Sorry for not being more precise in my wording -  I meant registering
> > versions of schema for connector config.
> >
> > Let's take the example of a fictional connector which uses a fictional
> AWS
> > service.
> >
> > Fictional Connector Config schema version:2.0
> > ---
> > {
> >   "$schema": "http://json-schema.org/draft-04/schema#;,
> >   "type": "object",
> >   "properties": {
> > "name": {
> >   "type": "string"
> > },
> > "schema_version": {
> >   "type": "string"
> > },
> > "aws_access_key": {
> >   "type": "string"
> > },
> > "aws_secret_key": {
> >   "type": "string"
> > }
> >   },
> >   "required": [
> > "name",
> > "schema_version",
> > "aws_access_key",
> > "aws_secret_key"
> >   ]
> > }
> >
> > Fictional Connector config schema version:3.0
> > ---
> > {
> >   "$schema": "http://json-schema.org/draft-04/schema#;,
> >   "type": "object",
> >   "properties": {
> > "name": {
> >   "type": "string"
> > },
> > "schema_version": {
> >   "type": "string"
> > },
> > "iam_role": {
> >   "type": "string"
> > }
> >   },
> >   "required": [
> > "name",
> > "schema_version",
> > "iam_role"
> >   ]
> > }
> >
> > The connector which supports Fictional config schema 2.0  will validate
> the
> > access key and secret key.
> > Whereas a connector which supports config with schema version 3.0 will
> only
> > validate the IAM role.
> >
> > This is the alternative which I wanted to suggest. Each plugin will
> > register the schema versions of connector config which it supports.
> >
> > The plugin paths may be optionally different i.e  we don't have to
> > mandatorily add a new plugin path to support a new schema version.
> >
> > Thanks,
> > Ashwin
> >
> > On Tue, Nov 22, 2022 at 12:47 PM Snehashis 
> > wrote:
> >
> > > Thanks for the input Ashwin.
> > >
> > > > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > > > config is versioned and has a schema. Then a single plugin (whose
> > > > dependencies have not changed) can handle multiple config versions
> for
> > > the
> > > > same connector class.
> > >
> > > IIUC (please correct me if I am wrong here), what you highlighted
> above,
> > is
> > > a versioning scheme for a connector config for the same connector (and
> > not
> > 

Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Snehashis
Thanks for the explanation Ashwin.

This is an interesting notion. This is something which many connectors
implicitly do anyway. There are several connectors which have different
methods of interpreting the configurations provided. Often the user has
some control over how provided configuration should be used, through
omission of configs, boolean flags that activate/deactivate certain
configs, etc. One could argue that this increases the verbosity of the
configurations and makes it monolithic, however the alternative proposal of
having multiple registered schemas only really seems worthwhile if that the
runtime has the ability to alter the functionality of a connector. There
needs to be some way of registering multiple functionalities, one for each
configuration type. Otherwise, if the runtime is simply passing on the
configuration to the connector, regardless of the which schema version it
belongs to, and delegating the responsibility of picking the functionality
to the connector itself, there is very little the runtime is adding by
registering schemas. Multiple connector versions implicitly define
different configs and functionalities and hence, the ability to run
different versions of the connector itself seems like a more elegant
solution to address this problem.

I also don't think multiple configurations are the only use case for
running different versions of a connector. There could be internal changes
to a connector that do not involve any config changes. A change that
targets a particular enhancement may be incompatible with the older
behaviour. Right now in order to make the changes backwards compatible we
would have to gate the changes behind a connector config (or a different
schema and functionality registration). Otherwise the user is forced to
keep using the older connector until they can upgrade. Problem is if there
are multiple such enhancements (and only one is a breaking change) then
they are missing out on all the other enhements. It is simpler for the user
to have the ability to run both versions of the connector.

On Tue, Nov 22, 2022 at 2:11 PM Ashwin  wrote:

> Hi Snehasis,
>
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> a versioning scheme for a connector config for the same connector (and not
> different versions of a connector plugin).
>
> Sorry for not being more precise in my wording -  I meant registering
> versions of schema for connector config.
>
> Let's take the example of a fictional connector which uses a fictional AWS
> service.
>
> Fictional Connector Config schema version:2.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#;,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "aws_access_key": {
>   "type": "string"
> },
> "aws_secret_key": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "aws_access_key",
> "aws_secret_key"
>   ]
> }
>
> Fictional Connector config schema version:3.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#;,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "iam_role": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "iam_role"
>   ]
> }
>
> The connector which supports Fictional config schema 2.0  will validate the
> access key and secret key.
> Whereas a connector which supports config with schema version 3.0 will only
> validate the IAM role.
>
> This is the alternative which I wanted to suggest. Each plugin will
> register the schema versions of connector config which it supports.
>
> The plugin paths may be optionally different i.e  we don't have to
> mandatorily add a new plugin path to support a new schema version.
>
> Thanks,
> Ashwin
>
> On Tue, Nov 22, 2022 at 12:47 PM Snehashis 
> wrote:
>
> > Thanks for the input Ashwin.
> >
> > > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > > config is versioned and has a schema. Then a single plugin (whose
> > > dependencies have not changed) can handle multiple config versions for
> > the
> > > same connector class.
> >
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> > a versioning scheme for a connector config for the same connector (and
> not
> > different versions of a connector plugin). That is a somewhat tangential
> > problem. While it is definitely a useful feature to have, like a log to
> > check what changes were made over time to the config which might make it
> > easier to do rollbacks, it is not the focus here. Here by version we mean
> > to say what underlying version of the plugin should the given
> configuration
> > of the connector use. Perhaps it is better to change the name of the
> > parameter from 

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-22 Thread Victoria Xia
Thanks, Matthias and Sagar, for your comments! I've responded here for now,
and will update the KIP afterwards with the outcome of our discussions as
they resolve.

--- Matthias's comments ---

> (1) Why does the new store not extend KeyValueStore, but StateStore?
In the end, it's a KeyValueStore?

A `VersionedKeyValueStore` is not a `KeyValueStore` because
many of the KeyValueStore methods would not make sense for a versioned
store. For example, `put(K key, V value)` is not meaningful for a versioned
store because the record needs a timestamp associated with it.

A `VersionedKeyValueStore` is more similar to a `KeyValueStore>` (i.e., `TimestampedKeyValueStore`), but some
of the TimestampedKeyValueStore methods are still problematic. For example,
what does it mean for `delete(K key)` to have return type
`ValueAndTimestamp`? Does this mean that `delete(K key)` only deletes
(and returns) the latest record version for the key? Probably we want a
versioned store to have `delete(K key)` delete all record versions for the
given key, in which case the return type is better suited as an
iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
ValueAndTimestamp value)` also has ambiguous semantics for versioned stores
(i.e., what does it mean for the key/record to be "absent").

I agree that conceptually a versioned key-value store is just a key-value
store, though. In the future if we redesign the store interfaces, it'd be
great to unify them by having a more generic KeyValueStore interface that
allows for extra flexibility to support different types of key-value
stores, including versioned stores. (Or, if you can think of a way to
achieve this with the existing interfaces today, I'm all ears!)

> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
want to support IQ in this KIP, it might be good to add this interface
right away to avoid complications for follow up KIPs? Or won't there by
any complications anyway?

I don't think there will be complications for refactoring to add this
interface in the future. Refactoring out ReadOnlyVersionedKeyValueStore
from VersionedKeyValueStore would leave VersionedKeyValueStore unchanged
from the outside.

Also, is it true that the ReadOnlyKeyValueStore interface is only used for
IQv1 and not IQv2? I think it's an open question as to whether we should
support IQv1 for versioned stores or only IQv2. If the latter, then maybe
we won't need the extra interface at all.

> (3) Why do we not have a `delete(key)` method? I am ok with not
supporting all methods from existing KV-store, but a `delete(key)` seems
to be fundamentally to have?

What do you think the semantics of `delete(key)` should be for versioned
stores? Should `delete(key)` delete (and return) all record versions for
the key? Or should we have `delete(key, timestamp)` which is equivalent to
`put(key, null, timestamp)` except with a return type to return
ValueAndTimestamp representing the record it replaced?

If we have ready alignment on what the interface and semantics for
`delete(key)` should be, then adding it in this KIP sounds good. I just
didn't want the rest of the KIP to be hung up over additional interfaces,
given that we can always add extra interfaces in the future.

> (4a) Do we need `get(key)`? It seems to be the same as `get(key,
MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
my own clarification (should we add something to the JavaDocs?).

Correct, it is just syntactic sugar. I will add a clarification into the
Javadocs as you've suggested.

> (4b) Should we throw an exception if a user queries out-of-bound
instead of returning `null` (in `get(key,ts)`)?
   -> You put it into "rejected alternatives", and I understand your
argument. Would love to get input from others about this question
though. -- It seems we also return `null` for windowed stores, so maybe
the strongest argument is to align to existing behavior? Or do we have
case for which the current behavior is problematic?

Sure; curious to hear what others think as well.

> (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
discretion when this is the case)" -> Should we make it a stricter
contract such that the user can reason about it better (there is WIP to
make retention time a strict bound for windowed stores atm)
   -> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
strict bound, too.

Ah, great question. I think the question boils down to: do we want to
require that all versioned stores (including custom user implementations)
use "history retention" to determine when to expire old record versions?

Because the `persistentVersionedKeyValueStore(...)` method returns
instances of the provided RocksDB-based versioned store implementation,
which does use history retention for this purpose, that's why we can very
clearly say that for this store, `get(key, ts)` will return null if the
provided timestamp bound has fallen out of history retention. The 

[jira] [Resolved] (KAFKA-14307) KRaft controller time based snapshots

2022-11-22 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-14307.

Resolution: Fixed

> KRaft controller time based snapshots
> -
>
> Key: KAFKA-14307
> URL: https://issues.apache.org/jira/browse/KAFKA-14307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14394) BrokerToControllerChannelManager has 2 separate timeouts

2022-11-22 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-14394.
---
Resolution: Not A Problem

> BrokerToControllerChannelManager has 2 separate timeouts
> 
>
> Key: KAFKA-14394
> URL: https://issues.apache.org/jira/browse/KAFKA-14394
> Project: Kafka
>  Issue Type: Task
>Reporter: Ron Dagostino
>Priority: Major
>
> BrokerToControllerChannelManager uses `config.controllerSocketTimeoutMs` as 
> its default `networkClientRetryTimeoutMs` in general, but it does accept a 
> second `retryTimeoutMs`, value -- and then there is exactly one place where 
> second timeout is used: within BrokerToControllerRequestThread.  Is this 
> second, separate timeout actually necessary, or is it a bug (in which case 
> the two timeouts should be the same).  Closely related to this is the case of 
> AlterPartitionManager, which sends Long.MAX_VALUE as the retryTimeoutMs value 
> when it instantiates its instance of BrokerToControllerChannelManager.  Is 
> this Long.MAX_VALUE correct, when in fact `config.controllerSocketTimeoutMs` 
> is being used as the other timeout?
> This is related to 
> https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14392 and the 
> associated PR, https://github.com/apache/kafka/pull/12856



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-890 Server Side Defense

2022-11-22 Thread Justine Olshan
Thanks for taking a look Matthias. I've tried to answer your questions
below:

10)

Right — so the hanging transaction only occurs when we have a late message
come in and the partition is never added to a transaction again. If we
never add the partition to a transaction, we will never write a marker and
never advance the LSO.

If we do end up adding the partition to the transaction (I suppose this can
happen before or after the late message comes in) then we will include the
late message in the next (incorrect) transaction.

So perhaps it is clearer to make the distinction between messages that
eventually get added to the transaction (but the wrong one) or messages
that never get added and become hanging.


20)

The client side change for 2 is removing the addPartitions to transaction
call. We don't need to make this from the producer to the txn coordinator,
only server side.


In my opinion, the issue with the addPartitionsToTxn call for older clients
is that we don't have the epoch bump, so we don't know if the message
belongs to the previous transaction or this one. We need to check if the
partition has been added to this transaction. Of course, this means we
won't completely cover the case where we have a really late message and we
have added the partition to the new transaction, but that's unfortunately
something we will need the new clients to cover.


30)

Transaction is ongoing = partition was added to transaction via
addPartitionsToTxn. We check this with the DescribeTransactions call. Let
me know if this wasn't sufficiently explained here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)


40)

The idea here is that if any messages somehow come in before we get the new
epoch to the producer, they will be fenced. However, if we don't think this
is necessary, it can be discussed


50)

It should be synchronous because if we have an event (ie, an error) that
causes us to need to abort the transaction, we need to know which
partitions to send transaction markers to. We know the partitions because
we added them to the coordinator via the addPartitionsToTxn call.
Previously we have had asynchronous calls in the past (ie, writing the
commit markers when the transaction is completed) but often this just
causes confusion as we need to wait for some operations to complete. In the
writing commit markers case, clients often see CONCURRENT_TRANSACTIONs
error messages and that can be confusing. For that reason, it may be
simpler to just have synchronous calls — especially if we need to block on
some operation's completion anyway before we can start the next
transaction. And yes, I meant coordinator. I will fix that.


60)

When we are checking if the transaction is ongoing, we need to make a round
trip from the leader partition to the transaction coordinator. In the time
we are waiting for this message to come back, in theory we could have sent
a commit/abort call that would make the original result of the check out of
date. That is why we can check the leader state before we write to the log.


I'm happy to update the KIP if some of these things were not clear.
Thanks,
Justine

On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax  wrote:

> Thanks for the KIP.
>
> Couple of clarification questions (I am not a broker expert do maybe
> some question are obvious for others, but not for me with my lack of
> broker knowledge).
>
>
>
> (10)
>
> > The delayed message case can also violate EOS if the delayed message
> comes in after the next addPartitionsToTxn request comes in. Effectively we
> may see a message from a previous (aborted) transaction become part of the
> next transaction.
>
> What happens if the message come in before the next addPartitionsToTxn
> request? It seems the broker hosting the data partitions won't know
> anything about it and append it to the partition, too? What is the
> difference between both cases?
>
> Also, it seems a TX would only hang, if there is no following TX that is
> either committer or aborted? Thus, for the case above, the TX might
> actually not hang (of course, we might get an EOS violation if the first
> TX was aborted and the second committed, or the other way around).
>
>
> (20)
>
> > Of course, 1 and 2 require client-side changes, so for older clients,
> those approaches won’t apply.
>
> For (1) I understand why a client change is necessary, but not sure why
> we need a client change for (2). Can you elaborate? -- Later you explain
> that we should send a DescribeTransactionRequest, but I am not sure why?
> Can't we not just do an implicit AddPartiitonToTx, too? If the old
> producer correctly registered the partition already, the TX-coordinator
> can just ignore it as it's an idempotent operation?
>
>
> (30)
>
> > To cover older clients, we will ensure a transaction is ongoing before
> we write to a transaction
>
> Not sure what you mean by this? 

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.0 #213

2022-11-22 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 284177 lines...]
[2022-11-22T11:53:36.990Z] 
[2022-11-22T11:53:36.990Z] > Task :clients:srcJar
[2022-11-22T11:53:36.990Z] Execution optimizations have been disabled for task 
':clients:srcJar' to ensure correctness due to the following reasons:
[2022-11-22T11:53:36.990Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.0@2/clients/src/generated/java'.
 Reason: Task ':clients:srcJar' uses this output of task 
':clients:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.1.1/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2022-11-22T11:53:38.746Z] 
[2022-11-22T11:53:38.746Z] > Task :clients:testJar
[2022-11-22T11:53:39.835Z] > Task :clients:testSrcJar
[2022-11-22T11:53:39.835Z] > Task 
:clients:publishMavenJavaPublicationToMavenLocal
[2022-11-22T11:53:39.835Z] > Task :clients:publishToMavenLocal
[2022-11-22T11:53:39.835Z] 
[2022-11-22T11:53:39.835Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 8.0.
[2022-11-22T11:53:39.835Z] 
[2022-11-22T11:53:39.835Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2022-11-22T11:53:39.835Z] 
[2022-11-22T11:53:39.835Z] See 
https://docs.gradle.org/7.1.1/userguide/command_line_interface.html#sec:command_line_warnings
[2022-11-22T11:53:39.835Z] 
[2022-11-22T11:53:39.835Z] Execution optimizations have been disabled for 3 
invalid unit(s) of work during this build to ensure correctness.
[2022-11-22T11:53:39.835Z] Please consult deprecation warnings for more details.
[2022-11-22T11:53:39.835Z] 
[2022-11-22T11:53:39.835Z] BUILD SUCCESSFUL in 31s
[2022-11-22T11:53:39.835Z] 77 actionable tasks: 34 executed, 43 up-to-date
[Pipeline] sh
[2022-11-22T11:53:42.679Z] + grep ^version= gradle.properties
[2022-11-22T11:53:42.679Z] + cut -d= -f 2
[Pipeline] dir
[2022-11-22T11:53:43.370Z] Running in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.0@2/streams/quickstart
[Pipeline] {
[Pipeline] sh
[2022-11-22T11:53:45.525Z] + mvn clean install -Dgpg.skip
[2022-11-22T11:53:46.728Z] [INFO] Scanning for projects...
[2022-11-22T11:53:46.728Z] [INFO] 

[2022-11-22T11:53:46.728Z] [INFO] Reactor Build Order:
[2022-11-22T11:53:46.728Z] [INFO] 
[2022-11-22T11:53:46.728Z] [INFO] Kafka Streams :: Quickstart   
 [pom]
[2022-11-22T11:53:46.728Z] [INFO] streams-quickstart-java   
 [maven-archetype]
[2022-11-22T11:53:46.728Z] [INFO] 
[2022-11-22T11:53:46.728Z] [INFO] < 
org.apache.kafka:streams-quickstart >-
[2022-11-22T11:53:46.728Z] [INFO] Building Kafka Streams :: Quickstart 
3.0.3-SNAPSHOT[1/2]
[2022-11-22T11:53:46.728Z] [INFO] [ pom 
]-
[2022-11-22T11:53:47.823Z] [INFO] 
[2022-11-22T11:53:47.823Z] [INFO] --- maven-clean-plugin:3.0.0:clean 
(default-clean) @ streams-quickstart ---
[2022-11-22T11:53:47.823Z] [INFO] 
[2022-11-22T11:53:47.823Z] [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ streams-quickstart ---
[2022-11-22T11:53:47.823Z] [INFO] 
[2022-11-22T11:53:47.823Z] [INFO] --- maven-site-plugin:3.5.1:attach-descriptor 
(attach-descriptor) @ streams-quickstart ---
[2022-11-22T11:53:49.931Z] [INFO] 
[2022-11-22T11:53:49.931Z] [INFO] --- maven-gpg-plugin:1.6:sign 
(sign-artifacts) @ streams-quickstart ---
[2022-11-22T11:53:49.931Z] [INFO] 
[2022-11-22T11:53:49.931Z] [INFO] --- maven-install-plugin:2.5.2:install 
(default-install) @ streams-quickstart ---
[2022-11-22T11:53:49.931Z] [INFO] Installing 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.0@2/streams/quickstart/pom.xml
 to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart/3.0.3-SNAPSHOT/streams-quickstart-3.0.3-SNAPSHOT.pom
[2022-11-22T11:53:49.931Z] [INFO] 
[2022-11-22T11:53:49.931Z] [INFO] --< 
org.apache.kafka:streams-quickstart-java >--
[2022-11-22T11:53:49.931Z] [INFO] Building streams-quickstart-java 
3.0.3-SNAPSHOT[2/2]
[2022-11-22T11:53:49.931Z] [INFO] --[ maven-archetype 
]---
[2022-11-22T11:53:49.931Z] [INFO] 
[2022-11-22T11:53:49.931Z] [INFO] --- maven-clean-plugin:3.0.0:clean 
(default-clean) @ streams-quickstart-java ---
[2022-11-22T11:53:49.931Z] [INFO] 
[2022-11-22T11:53:49.931Z] [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ streams-quickstart-java 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-22 Thread Nick Telford
Hi John,

Thanks for the review and feedback!

1. Custom Stores: I've been mulling over this problem myself. As it stands,
custom stores would essentially lose checkpointing with no indication that
they're expected to make changes, besides a line in the release notes. I
agree that the best solution would be to provide a default that checkpoints
to a file. The one thing I would change is that the checkpointing is to a
store-local file, instead of a per-Task file. This way the StateStore still
technically owns its own checkpointing (via a default implementation), and
the StateManager/Task execution engine doesn't need to know anything about
checkpointing, which greatly simplifies some of the logic.

2. OOME errors: The main reasons why I didn't explore a solution to this is
a) to keep this KIP as simple as possible, and b) because I'm not exactly
how to signal that a Task should commit prematurely. I'm confident it's
possible, and I think it's worth adding a section on handling this. Besides
my proposal to force an early commit once memory usage reaches a threshold,
is there any other approach that you might suggest for tackling this
problem?

3. ALOS: I can add in an explicit paragraph, but my assumption is that
since transactional behaviour comes at little/no cost, that it should be
available by default on all stores, irrespective of the processing mode.
While ALOS doesn't use transactions, the Task itself still "commits", so
the behaviour should be correct under ALOS too. I'm not convinced that it's
worth having both transactional/non-transactional stores available, as it
would considerably increase the complexity of the codebase, for very little
benefit.

4. Method deprecation: Are you referring to StateStore#getPosition()? As I
understand it, Position contains the position of the *source* topics,
whereas the commit offsets would be the *changelog* offsets. So it's still
necessary to retain the Position data, as well as the changelog offsets.
What I meant in the KIP is that Position offsets are currently stored in a
file, and since we can atomically store metadata along with the record
batch we commit to RocksDB, we can move our Position offsets in to this
metadata too, and gain the same transactional guarantees that we will for
changelog offsets, ensuring that the Position offsets are consistent with
the records that are read from the database.

Regards,
Nick

On Tue, 22 Nov 2022 at 16:25, John Roesler  wrote:

> Thanks for publishing this alternative, Nick!
>
> The benchmark you mentioned in the KIP-844 discussion seems like a
> compelling reason to revisit the built-in transactionality mechanism. I
> also appreciate you analysis, showing that for most use cases, the write
> batch approach should be just fine.
>
> There are a couple of points that would hold me back from approving this
> KIP right now:
>
> 1. Loss of coverage for custom stores.
> The fact that you can plug in a (relatively) simple implementation of the
> XStateStore interfaces and automagically get a distributed database out of
> it is a significant benefit of Kafka Streams. I'd hate to lose it, so it
> would be better to spend some time and come up with a way to preserve that
> property. For example, can we provide a default implementation of
> `commit(..)` that re-implements the existing checkpoint-file approach? Or
> perhaps add an `isTransactional()` flag to the state store interface so
> that the runtime can decide whether to continue to manage checkpoint files
> vs delegating transactionality to the stores?
>
> 2. Guarding against OOME
> I appreciate your analysis, but I don't think it's sufficient to say that
> we will solve the memory problem later if it becomes necessary. The
> experience leading to that situation would be quite bad: Imagine, you
> upgrade to AK 3.next, your tests pass, so you deploy to production. That
> night, you get paged because your app is now crashing with OOMEs. As with
> all OOMEs, you'll have a really hard time finding the root cause, and once
> you do, you won't have a clear path to resolve the issue. You could only
> tune down the commit interval and cache buffer size until you stop getting
> crashes.
>
> FYI, I know of multiple cases where people run EOS with much larger commit
> intervals to get better batching than the default, so I don't think this
> pathological case would be as rare as you suspect.
>
> Given that we already have the rudiments of an idea of what we could do to
> prevent this downside, we should take the time to design a solution. We owe
> it to our users to ensure that awesome new features don't come with bitter
> pills unless we can't avoid it.
>
> 3. ALOS mode.
> On the other hand, I didn't see an indication of how stores will be
> handled under ALOS (aka non-EOS) mode. Theoretically, the transactionality
> of the store and the processing mode are orthogonal. A transactional store
> would serve ALOS just as well as a non-transactional one (if not better).
> Under ALOS, 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-22 Thread John Roesler
Thanks for publishing this alternative, Nick!

The benchmark you mentioned in the KIP-844 discussion seems like a compelling 
reason to revisit the built-in transactionality mechanism. I also appreciate 
you analysis, showing that for most use cases, the write batch approach should 
be just fine.

There are a couple of points that would hold me back from approving this KIP 
right now:

1. Loss of coverage for custom stores.
The fact that you can plug in a (relatively) simple implementation of the 
XStateStore interfaces and automagically get a distributed database out of it 
is a significant benefit of Kafka Streams. I'd hate to lose it, so it would be 
better to spend some time and come up with a way to preserve that property. For 
example, can we provide a default implementation of `commit(..)` that 
re-implements the existing checkpoint-file approach? Or perhaps add an 
`isTransactional()` flag to the state store interface so that the runtime can 
decide whether to continue to manage checkpoint files vs delegating 
transactionality to the stores?

2. Guarding against OOME
I appreciate your analysis, but I don't think it's sufficient to say that we 
will solve the memory problem later if it becomes necessary. The experience 
leading to that situation would be quite bad: Imagine, you upgrade to AK 
3.next, your tests pass, so you deploy to production. That night, you get paged 
because your app is now crashing with OOMEs. As with all OOMEs, you'll have a 
really hard time finding the root cause, and once you do, you won't have a 
clear path to resolve the issue. You could only tune down the commit interval 
and cache buffer size until you stop getting crashes.

FYI, I know of multiple cases where people run EOS with much larger commit 
intervals to get better batching than the default, so I don't think this 
pathological case would be as rare as you suspect.

Given that we already have the rudiments of an idea of what we could do to 
prevent this downside, we should take the time to design a solution. We owe it 
to our users to ensure that awesome new features don't come with bitter pills 
unless we can't avoid it.

3. ALOS mode.
On the other hand, I didn't see an indication of how stores will be handled 
under ALOS (aka non-EOS) mode. Theoretically, the transactionality of the store 
and the processing mode are orthogonal. A transactional store would serve ALOS 
just as well as a non-transactional one (if not better). Under ALOS, though, 
the default commit interval is five minutes, so the memory issue is far more 
pressing.

As I see it, we have several options to resolve this point. We could 
demonstrate that transactional stores work just fine for ALOS and we can 
therefore just swap over unconditionally. We could also disable the 
transactional mechanism under ALOS so that stores operate just the same as they 
do today when run in ALOS mode. Finally, we could do the same as in KIP-844 and 
make transactional stores opt-in (it'd be better to avoid the extra opt-in 
mechanism, but it's a good get-out-of-jail-free card).

4. (minor point) Deprecation of methods

You mentioned that the new `commit` method replaces flush, 
updateChangelogOffsets, and checkpoint. It seems to me that the point about 
atomicity and Position also suggests that it replaces the Position callbacks. 
However, the proposal only deprecates `flush`. Should we be deprecating other 
methods as well?

Thanks again for the KIP! It's really nice that you and Alex will get the 
chance to collaborate on both directions so that we can get the best outcome 
for Streams and its users.

-John


On 2022/11/21 15:02:15 Nick Telford wrote:
> Hi everyone,
> 
> As I mentioned in the discussion thread for KIP-844, I've been working on
> an alternative approach to achieving better transactional semantics for
> Kafka Streams StateStores.
> 
> I've published this separately as KIP-892: Transactional Semantics for
> StateStores
> ,
> so that it can be discussed/reviewed separately from KIP-844.
> 
> Alex: I'm especially interested in what you think!
> 
> I have a nearly complete implementation of the changes outlined in this
> KIP, please let me know if you'd like me to push them for review in advance
> of a vote.
> 
> Regards,
> 
> Nick
> 


[jira] [Created] (KAFKA-14415) `ThreadCache` is getting slower with every additional state store

2022-11-22 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-14415:
--

 Summary: `ThreadCache` is getting slower with every additional 
state store
 Key: KAFKA-14415
 URL: https://issues.apache.org/jira/browse/KAFKA-14415
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Brutschy


There are a few lines in `ThreadCache` that I think should be optimized. 
`sizeBytes` is called at least once, and potentially many times in every `put` 
and is linear in the number of caches (= number of state stores, so typically 
proportional to number of tasks). That means, with every additional task, every 
put gets a little slower. The throughput is 30% higher if replace it by 
constant time update…

Compare the throughput of TIME_ROCKS on trunk (green graph):

[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/]

This is the throughput of TIME_ROCKS when a constant time `sizeBytes` 
implementation is used:

[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/]

So the throughput is ~20% higher. 

The same seems to apply for the MEM backend (initial throughput >8000 instead 
of 6000), however, I cannot run the same benchmark here because the memory is 
filled too quickly.

[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.3 #124

2022-11-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-22 Thread Sagar
Hi Victoria,

Thanks for the KIP. Seems like a very interesting idea!

I have a couple of questions:

1) Did you consider adding a method similar to :
List> get(K key, long from, long to)?

I think this could be useful considering that this
versioning scheme unlocks time travel at a key basis. WDYT?

2) I have a similar question as Matthias, about the timestampTo argument
when doing a get. Is it inclusive or exclusive?

3) validFrom sounds slightly confusing to me. It is essentially the
timestamp at which the record was inserted. validFrom makes it sound like
validTo which can keep changing based on new records while *from* is fixed.
WDYT?

4) Even I think delete api should be supported.

Thanks!
Sagar.

On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax  wrote:

> Thanks for the KIP Victoria. Very well written!
>
>
> Couple of questions (many might just require to add some more details to
> the KIP):
>
>   (1) Why does the new store not extend KeyValueStore, but StateStore?
> In the end, it's a KeyValueStore?
>
>   (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
> want to support IQ in this KIP, it might be good to add this interface
> right away to avoid complications for follow up KIPs? Or won't there by
> any complications anyway?
>
>   (3) Why do we not have a `delete(key)` method? I am ok with not
> supporting all methods from existing KV-store, but a `delete(key)` seems
> to be fundamentally to have?
>
>   (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
> my own clarification (should we add something to the JavaDocs?).
>
>   (4b) Should we throw an exception if a user queries out-of-bound
> instead of returning `null` (in `get(key,ts)`)?
>-> You put it into "rejected alternatives", and I understand your
> argument. Would love to get input from others about this question
> though. -- It seems we also return `null` for windowed stores, so maybe
> the strongest argument is to align to existing behavior? Or do we have
> case for which the current behavior is problematic?
>
>   (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
> discretion when this is the case)" -> Should we make it a stricter
> contract such that the user can reason about it better (there is WIP to
> make retention time a strict bound for windowed stores atm)
>-> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
> strict bound, too.
>
>   (5a) Do we need to expose `segmentInterval`? For windowed-stores, we
> also use segments but hard-code it to two (it was exposed in earlier
> versions but it seems not useful, even if we would be open to expose it
> again if there is user demand).
>
>   (5b) JavaDocs says: "Performance degrades as more record versions for
> the same key are collected in a single segment. On the other hand,
> out-of-order writes and reads which access older segments may slow down
> if there are too many segments." -- Wondering if JavaDocs should make
> any statements about expected performance? Seems to be an implementation
> detail?
>
>   (6) validTo timestamp is "exclusive", right? Ie, if I query
> `get(key,ts[=validToV1])` I would get `null` or the "next" record v2
> with validFromV2=ts?
>
>   (7) The KIP says, that segments are stores in the same RocksDB -- for
> this case, how are efficient deletes handled? For windowed-store, we can
> just delete a full RocksDB.
>
>   (8) Rejected alternatives: you propose to not return the validTo
> timestamp -- if we find it useful in the future to return it, would
> there be a clean path to change it accordingly?
>
>
> -Matthias
>
>
> On 11/16/22 9:57 PM, Victoria Xia wrote:
> > Hi everyone,
> >
> > I have a proposal for introducing versioned state stores in Kafka
> Streams.
> > Versioned state stores are similar to key-value stores except they can
> > store multiple record versions for a single key. This KIP focuses on
> > interfaces only in order to limit the scope of the KIP.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >
> > Thanks,
> > Victoria
> >
>


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Sagar
Hey Snehashsih,

Thanks for the KIP. It looks like a very useful feature. Couple of
small-ish points, let me know what you think:

1) Should we update the GET /connectors endpoint to include the version of
the plugin that is running? It could be useful to figure out the version of
the plugin or I am assuming it gets returned by the expand=info call?
2) I am not aware of this and hence asking, can 2 connectors with different
versions have the same name? Does the plugin isolation allow this? This
could have a bearing when using the lifecycle endpoints for connectors like
DELETE etc.

Thanks!
Sagar.


On Tue, Nov 22, 2022 at 2:10 PM Ashwin  wrote:

> Hi Snehasis,
>
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> a versioning scheme for a connector config for the same connector (and not
> different versions of a connector plugin).
>
> Sorry for not being more precise in my wording -  I meant registering
> versions of schema for connector config.
>
> Let's take the example of a fictional connector which uses a fictional AWS
> service.
>
> Fictional Connector Config schema version:2.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#;,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "aws_access_key": {
>   "type": "string"
> },
> "aws_secret_key": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "aws_access_key",
> "aws_secret_key"
>   ]
> }
>
> Fictional Connector config schema version:3.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#;,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "iam_role": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "iam_role"
>   ]
> }
>
> The connector which supports Fictional config schema 2.0  will validate the
> access key and secret key.
> Whereas a connector which supports config with schema version 3.0 will only
> validate the IAM role.
>
> This is the alternative which I wanted to suggest. Each plugin will
> register the schema versions of connector config which it supports.
>
> The plugin paths may be optionally different i.e  we don't have to
> mandatorily add a new plugin path to support a new schema version.
>
> Thanks,
> Ashwin
>
> On Tue, Nov 22, 2022 at 12:47 PM Snehashis 
> wrote:
>
> > Thanks for the input Ashwin.
> >
> > > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > > config is versioned and has a schema. Then a single plugin (whose
> > > dependencies have not changed) can handle multiple config versions for
> > the
> > > same connector class.
> >
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> > a versioning scheme for a connector config for the same connector (and
> not
> > different versions of a connector plugin). That is a somewhat tangential
> > problem. While it is definitely a useful feature to have, like a log to
> > check what changes were made over time to the config which might make it
> > easier to do rollbacks, it is not the focus here. Here by version we mean
> > to say what underlying version of the plugin should the given
> configuration
> > of the connector use. Perhaps it is better to change the name of the
> > parameter from connector.version to connector.plugin.version or
> > plugin.version if it was confusing. wdyt?
> >
> > >  2. Any plans to support assisted migration e.g if a user invokes "POST
> > > connector/config?migrate=latest", the latest version __attempts__ to
> > > transform the existing config to the newer version. This would require
> > > adding a method like "boolean migrate(Version fromVersion)" to the
> > > connector interface.
> >
> > This is an enhancement we can think of doing in future. Users can simply
> do
> > a PUT call with the updated config which has the updated version number.
> > The assisted mode could be handy as the user does not need to know the
> > config but beyond this it does not seem to justify its existence.
> >
> > Regards
> > Snehashis
> >
> > On Tue, Nov 22, 2022 at 10:50 AM Ashwin 
> > wrote:
> >
> > > Hi Snehasis,
> > >
> > > This is a really useful feature and thanks for initiating this
> > discussion.
> > >
> > > I had the following questions -
> > >
> > >
> > > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > > config is versioned and has a schema. Then a single plugin (whose
> > > dependencies have not changed) can handle multiple config versions for
> > the
> > > same connector class.
> > >
> > > 2. Any plans to support assisted migration e.g if a user invokes "POST
> > > connector/config?migrate=latest", the latest version __attempts__ to
> > > transform the existing config to the newer version. 

Re: Request for Kafka Jira account

2022-11-22 Thread Gantigmaa Selenge
Thank you Mickael!

Regards,
Gantigmaa

On Mon, Nov 21, 2022 at 2:40 PM Mickael Maison 
wrote:

> Hi,
>
> I've created your account, you should receive an email with all the
> details.
>
> Thanks,
> Mickael
>
> On Mon, Nov 21, 2022 at 3:23 PM Gantigmaa Selenge 
> wrote:
> >
> > Hi team,
> >
> > Can I please have an account created for Kafka Jira?
> >
> > Username: tinaselenge
> > Display name: Gantigmaa Selenge
> > Email address: tina.sele...@gmail.com
> >
> > Thanks!
> >
> > Regards,
> > Gantigmaa
> > Posted to dev@kafka.apache.org
> > 
>
>


Re: [VOTE] KIP-884: Add config to configure KafkaClientSupplier in Kafka Streams

2022-11-22 Thread Sophie Blee-Goldman
Hey Hao, thanks for the KIP -- I'm +1 (binding)

On Mon, Nov 21, 2022 at 12:57 PM Matthias J. Sax  wrote:

> +1 (binding)
>
> On 11/21/22 7:39 AM, John Roesler wrote:
> > I'm +1 (binding)
> >
> > Thanks for the KIP!
> > -John
> >
> > On 2022/11/17 21:06:29 Hao Li wrote:
> >> Hi all,
> >>
> >> I would like start a vote on KIP-884:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams
> >>
> >>
> >> Thanks,
> >> Hao
> >>
>


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Ashwin
Hi Snehasis,

> IIUC (please correct me if I am wrong here), what you highlighted above,
is
a versioning scheme for a connector config for the same connector (and not
different versions of a connector plugin).

Sorry for not being more precise in my wording -  I meant registering
versions of schema for connector config.

Let's take the example of a fictional connector which uses a fictional AWS
service.

Fictional Connector Config schema version:2.0
---
{
  "$schema": "http://json-schema.org/draft-04/schema#;,
  "type": "object",
  "properties": {
"name": {
  "type": "string"
},
"schema_version": {
  "type": "string"
},
"aws_access_key": {
  "type": "string"
},
"aws_secret_key": {
  "type": "string"
}
  },
  "required": [
"name",
"schema_version",
"aws_access_key",
"aws_secret_key"
  ]
}

Fictional Connector config schema version:3.0
---
{
  "$schema": "http://json-schema.org/draft-04/schema#;,
  "type": "object",
  "properties": {
"name": {
  "type": "string"
},
"schema_version": {
  "type": "string"
},
"iam_role": {
  "type": "string"
}
  },
  "required": [
"name",
"schema_version",
"iam_role"
  ]
}

The connector which supports Fictional config schema 2.0  will validate the
access key and secret key.
Whereas a connector which supports config with schema version 3.0 will only
validate the IAM role.

This is the alternative which I wanted to suggest. Each plugin will
register the schema versions of connector config which it supports.

The plugin paths may be optionally different i.e  we don't have to
mandatorily add a new plugin path to support a new schema version.

Thanks,
Ashwin

On Tue, Nov 22, 2022 at 12:47 PM Snehashis  wrote:

> Thanks for the input Ashwin.
>
> > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > config is versioned and has a schema. Then a single plugin (whose
> > dependencies have not changed) can handle multiple config versions for
> the
> > same connector class.
>
> IIUC (please correct me if I am wrong here), what you highlighted above, is
> a versioning scheme for a connector config for the same connector (and not
> different versions of a connector plugin). That is a somewhat tangential
> problem. While it is definitely a useful feature to have, like a log to
> check what changes were made over time to the config which might make it
> easier to do rollbacks, it is not the focus here. Here by version we mean
> to say what underlying version of the plugin should the given configuration
> of the connector use. Perhaps it is better to change the name of the
> parameter from connector.version to connector.plugin.version or
> plugin.version if it was confusing. wdyt?
>
> >  2. Any plans to support assisted migration e.g if a user invokes "POST
> > connector/config?migrate=latest", the latest version __attempts__ to
> > transform the existing config to the newer version. This would require
> > adding a method like "boolean migrate(Version fromVersion)" to the
> > connector interface.
>
> This is an enhancement we can think of doing in future. Users can simply do
> a PUT call with the updated config which has the updated version number.
> The assisted mode could be handy as the user does not need to know the
> config but beyond this it does not seem to justify its existence.
>
> Regards
> Snehashis
>
> On Tue, Nov 22, 2022 at 10:50 AM Ashwin 
> wrote:
>
> > Hi Snehasis,
> >
> > This is a really useful feature and thanks for initiating this
> discussion.
> >
> > I had the following questions -
> >
> >
> > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > config is versioned and has a schema. Then a single plugin (whose
> > dependencies have not changed) can handle multiple config versions for
> the
> > same connector class.
> >
> > 2. Any plans to support assisted migration e.g if a user invokes "POST
> > connector/config?migrate=latest", the latest version __attempts__ to
> > transform the existing config to the newer version. This would require
> > adding a method like "boolean migrate(Version fromVersion)" to the
> > connector interface.
> >
> > Thanks,
> > Ashwin
> >
> > On Mon, Nov 21, 2022 at 2:27 PM Snehashis 
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion thread on KIP-891: Running multiple
> > versions
> > > of a connector.
> > >
> > > The KIP aims to add the ability for the connect runtime to run multiple
> > > versions of a connector.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
> > >
> > > Please take a look and let me know what you think.
> > >
> > > Thank you
> > > Snehashis Pal
> > >
> >
>