Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-03-06 Thread Luke Chen
Hi Abhijeet,

Thanks for the update and the explanation.
I had another look, and it LGTM now!

Thanks.
Luke

On Tue, Mar 5, 2024 at 2:50 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the reply. Sounds good to me.
>
> Jun
>
>
> On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar 
> wrote:
>
> > Hi Jun,
> >
> > Thanks for pointing it out. It makes sense to me. We can have the
> following
> > metrics instead. What do you think?
> >
> >- remote-(fetch|copy)-throttle-time-avg (The average time in ms remote
> >fetches/copies was throttled by a broker)
> >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> remote
> >fetches/copies was throttled by a broker)
> >
> > These are similar to fetch-throttle-time-avg and fetch-throttle-time-max
> > metrics we have for Kafak Consumers?
> > The Avg and Max are computed over the (sliding) window as defined by the
> > configuration metrics.sample.window.ms on the server.
> >
> > (Also, I will update the config and metric names to be consistent)
> >
> > Regards.
> >
> > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao 
> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the reply.
> > >
> > > The issue with recording the throttle time as a gauge is that it's
> > > transient. If the metric is not read immediately, the recorded value
> > could
> > > be reset to 0. The admin won't realize that throttling has happened.
> > >
> > > For client quotas, the throttle time is tracked as the average
> > > throttle-time per user/client-id. This makes the metric less transient.
> > >
> > > Also, the configs use read/write whereas the metrics use fetch/copy.
> > Could
> > > we make them consistent?
> > >
> > > Jun
> > >
> > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > abhijeet.cse@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > >
> > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> ->
> > > The
> > > > duration of time required at a given moment to bring the observed
> fetch
> > > > rate within the allowed limit, by preventing further reads.
> > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> ->
> > > The
> > > > duration of time required at a given moment to bring the observed
> > remote
> > > > copy rate within the allowed limit, by preventing further copies.
> > > >
> > > > Regards.
> > > >
> > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao 
> > > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the explanation. Makes sense to me now.
> > > > >
> > > > > Just a minor comment. Could you document the exact meaning of the
> > > > following
> > > > > two metrics? For example, is the time accumulated? If so, is it
> from
> > > the
> > > > > start of the broker or over some window?
> > > > >
> > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteFetchThrottleTime
> > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > abhijeet.cse@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > The existing quota system for consumers is designed to throttle
> the
> > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > The additional quota we want to add works on the broker level. If
> > the
> > > > > > broker-level remote read quota is being
> > > > > > exceeded, we prevent additional reads from the remote storage but
> > do
> > > > not
> > > > > > prevent local reads for the consumer.
> > > > > > If the consumer has specified other partitions to read, which can
> > be
> > > > > served
> > > > > > from local, it can continue to read those
> > > > > > partitions. To elaborate more, we make a check for quota exceeded
> > if
> > > we
> > > > > > know a segment needs to be read from
> > > > > > remote. If the quota is exceeded, we simply skip the partition
> and
> > > move
> > > > > to
> > > > > > other segments in the fetch request.
> > > > > > This way consumers can continue to read the local data as long as
> > > they
> > > > > have
> > > > > > not exceeded the client-level quota.
> > > > > >
> > > > > > Also, when we choose the appropriate consumer-level quota, we
> would
> > > > > > typically look at what kind of local fetch
> > > > > > throughput is supported. If we were to reuse the same consumer
> > quota,
> > > > we
> > > > > > should also consider the throughput
> > > > > > the remote storage supports. The throughput supported by remote
> may
> > > be
> > > > > > less/more than the throughput supported
> > > > > > by local (when using a cloud provider, it depends on the plan
> opted
> > > by
> > > > > the
> > > > > > user). The consumer quota has to be carefully
> > > > > > set considering both local and remote throughput. Instead, if we
> > > have a
> > > > > > separate quota, it makes things much simpler
> > > > 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2704

2024-03-06 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16350:
---

 Summary: StateUpdated does not init transaction after canceling 
task close action
 Key: KAFKA-16350
 URL: https://issues.apache.org/jira/browse/KAFKA-16350
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


With EOSv2, we use a thread producer shared across all tasks. We init tx on the 
producer with each _task_ (due to EOSv1 which uses a producer per task), and 
have a guard in place to only init tx a single time.

If we hit an error, we close the producer and create a new one, which is still 
not initialized for transaction. At the same time, with state updater, we 
schedule a "close task" action on error.

For each task we get back, we do cancel the "close task" action, to actually 
keep the task. If this happens for _all_ tasks, we don't have any task in state 
CRATED at hand, and thus we never init the producer for transactions, because 
we assume this was already done.

On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
{code:java}
Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
{code}
This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

2024-03-06 Thread Jun Rao
Hi, Jose,

Thanks for the reply.

20.1. It seems that the PreferredSuccessors field didn't get fixed. It's
still there together with PreferredCandidates.
+{ "name": "PreferredSuccessors", "type": "[]int32", "versions":
"0",
+  "about": "A sorted list of preferred successors to start the
election" },
+{ "name": "PreferredCandidates", "type": "[]ReplicaInfo",
"versions": "1+",
+  "about": "A sorted list of preferred candidates to start the
election", "fields": [

37. If we don't support batching in AddVoterResponse, RemoveVoterResponse
and UpdateVoterResponse, should we combine CurrentLeader and NodeEndpoint
into a single field?

42. We include VoterId and VoterUuid for the receiver in Vote and
BeginQuorumEpoch requests, but not in EndQuorumEpoch, Fetch and
FetchSnapshot. Could you explain how they are used?

Jun

On Wed, Mar 6, 2024 at 8:53 AM José Armando García Sancio
 wrote:

> Hi Jun,
>
> See my comments below.
>
> On Tue, Mar 5, 2024 at 2:57 PM Jun Rao  wrote:
> > 37. We don't batch multiple topic partitions in AddVoter, RemoveVoter and
> > UpdateVoter requests while other requests like Vote and BeginQuorumEpoch
> > support batching. Should we make them consistent?
>
> Originally I had them as batched RPCs but decided to make them only
> operate on one KRaft topic partition. I made this change primarily
> because this is false sharing and batching. Topic partitions in KRaft
> are independent; those operations will be handled independently and
> committed independently but because of the batching the kraft node
> will be required to wait on all the batched operations before it can
> send the response.
>
> I know that this is inconsistent with the other RPCs but I am hesitant
> to propagate this incorrect batching to new RPCs.
>
> > 38. BeginQuorumEpochRequest: It seems that we need to replace the name
> > field with a nodeId field in LeaderEndpoints?
>
> The schema looks good to me. This struct has a different meaning from
> the struct in the response of other RPCs. The BeginQuorumEpoch request
> is sent by the leader so the expectation is that the sending
> node/replica is the leader for all of the partitions sent. This also
> means that the endpoints sent in LeaderEndpoints are all for the same
> leader (or replica). The reason that BeginQuorumEpoch sends multiple
> endpoints is because the leader may be binding to multiple listeners.
> The leader sends a tuple (listener name, host name, host port) for
> each of its own advertised controller listeners.
>
> > 39. VoteRequest: Will the Voter ever be different from the Candidate? I
> > thought that in all the VoteRequests, the voter just votes for itself.
>
> Yes. This is a naming that always confuses me but makes sense to me
> after further analysis. The voter (VoterId and VoterUuid) is the
> replica receiving the Vote request and potentially voting for the
> sender (candidate). The candidate (CandidateId and CandidateUuid) is
> the replica sending the Vote request and asking for votes from the
> receivers (voters). I tried to better document it in their respective
> "about" schema fields.
>
> > 40. EndQuorumEpochRequest: Should we add a replicaUUID field to pair with
> > LeaderId?
>
> I don't think so. We can add it for consistency and to help debugging
> but I don't think it is needed for correctness. A leader cannot
> (should not) change replica uuid and remain leader. In theory the only
> way for a replica to change uuid is to lose their disk. If this
> happens the expectation is that they will also lose their
> QuorumStateData.
>
> > 41. Regarding including replica UUID to identify a voter: It adds a bit
> of
> > complexity. Could you explain whether it is truly needed? Before this
> KIP,
> > KRaft already supports replacing a disk on the voter node, right?
>
> Yes. This KIP attempts to solve two general problems. 1) How to
> proactively change the voters set by increasing or decreasing the
> replication factor; or replace voters in the voters set. 2) Identify
> disk failures and recover from them safely. This is what I have in the
> Motivation section:
> "Consensus on the cluster metadata partition was achieved by the
> voters (controllers). If the operator of a KRaft cluster wanted to
> make changes to the set of voters, they would have to shutdown all of
> the controllers nodes and manually make changes to the on-disk state
> of the old controllers and new controllers. If the operator wanted to
> replace an existing voter because of a disk failure or general
> hardware failure, they would have to make sure that the new voter node
> has a superset of the previous voter's on-disk state. Both of these
> solutions are manual and error prone."
>
> directory.id (replica uuid) is needed to identify and resolve disk
> failures in a voter. The section "Proposed change / User explanation /
> Common scenarios / Disk failure recovery" documents this use case in
> more detail.
>
> Thanks,
> --
> -José
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2703

2024-03-06 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition

2024-03-06 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16349:
---

 Summary: ShutdownableThread fails build by calling Exit with race 
condition
 Key: KAFKA-16349
 URL: https://issues.apache.org/jira/browse/KAFKA-16349
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.8.0
Reporter: Greg Harris


`ShutdownableThread` calls `Exit.exit()` when the thread's operation throws 
FatalExitError. In normal operation, this calls System.exit, and exits the 
process. In tests, the exit procedure is masked with Exit.setExitProcedure to 
prevent tests that encounter a FatalExitError from crashing the test JVM.

Masking of exit procedures is usually done in BeforeEach/AfterEach annotations, 
with the exit procedures cleaned up immediately after the test finishes. If the 
body of the test creates a ShutdownableThread that outlives the test, such as 
by omitting `ShutdownableThread#awaitShutdown`, by having 
`ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a 
race condition between `Exit.resetExitProcedure` and `Exit.exit`, then 
System.exit() can be called erroneously.

 
{noformat}
// First, in the test thread:
Exit.setExitProcedure(...)
try {
new ShutdownableThread(...).start()
} finally {
Exit.resetExitProcedure()
}
// Second, in the ShutdownableThread:
try {
throw new FatalExitError(...)
} catch (FatalExitError e) {
Exit.exit(...) // Calls real System.exit()
}{noformat}
 

This can be resolved by one of the following:
 # Eliminate FatalExitError usages in code when setExitProcedure is in-use
 # Eliminate the Exit.exit call from ShutdownableThread, and instead propagate 
this error to another thread to handle without a race-condition
 # Eliminate resetExitProcedure by refactoring Exit to be non-static

FatalExitError is in use in a small number of places, but may be difficult to 
eliminate:
 * FinalizedFeatureChangeListener
 * InterBrokerSendThread
 * TopicBasedRemoteLogMetadataManager

It appears that every other use of Exit.setExitProcedure/Exit.exit is done on a 
single thread, so ShutdownableThread is the only place where this race 
condition is present.

The effect of this bug is that the build is flaky, as race conditions/timeouts 
in tests can cause the gradle executors to exit with status code 1, which has 
happened 26 times in the last 28 days. I have not yet been able to confirm this 
bug is happening in other tests, but I do have a deterministic reproduction 
case with the exact same symptoms:
{noformat}
Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest > 
testShutdownWhenTestTimesOut(boolean) > 
"testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':core:test'.
> Process 'Gradle Test Executor 38' finished with non-zero exit value 1
  This problem might be caused by incorrect test process configuration.
  For more on test execution, please refer to 
https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
the Gradle documentation.{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-06 Thread Lucas Brutschy
Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
 1) a copy-restore variant without custom processing, as you propose.
 2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the nature of
the restore issue only became clear to me when I read through the
comments in the JIRA ticket you linked.
2) If we decide to keep the parameter `reprocessOnRestore`, the
Javadoc on it should be extended. This is a somewhat subtle issue, and
I don't think `restore by reprocessing` is enough of an explanation.

Nits:

`{@link ValueTransformer ValueTransformer}` -> `{@link
ValueTransformer ValueTransformers}`
`user defined` -> `user-defined`

Cheers,
Lucas

On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna  wrote:
>
> Hi Walker,
>
> Thanks for the KIP!
>
> Great that you are going to fix this long-standing issue!
>
> 1.
> I was wondering if we need the timestamp extractor as well as the key
> and value deserializer in Topology#addGlobalStore() that do not take a
> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
> Since those methods setup a global state store that does not process any
> records, do they still need to deserialize records and extract
> timestamps? Name might still be needed, right?
>
> 2.
>  From an API point of view, it might make sense to put all
> processor-related arguments into a parameter object. Something like:
> GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
> Just an idea, open for discussion.
>
> 3.
> Could you please go over the KIP and correct typos and other mistakes in
> the KIP?
>
>
> Best,
> Bruno
>
>
>
> On 3/2/24 1:43 AM, Matthias J. Sax wrote:
> > Thanks for the KIP Walker.
> >
> > Fixing this issue, and providing users some flexibility to opt-in/out on
> > "restore reprocessing" is certainly a good improvement.
> >
> >  From an API design POV, I like the idea to not require passing in a
> > ProcessorSupplier to begin with. Given the current implementation of the
> > restore process, this might have been the better API from the beginning
> > on... Well, better late than never :)
> >
> > For this new method w/o a supplier, I am wondering if we want to keep
> > `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar
> > thing via KIP-813. Just an idea.
> >
> > However, I am not convinced that adding a new boolean parameter is the
> > best way to design the API. Unfortunately, I don't have any elegant
> > proposal myself. Just a somewhat crazy idea to do a larger API change:
> >
> > Making a step back, a global store, is by definition a terminal node --
> > we don't support to add child nodes. Hence, while we expose a full
> > `ProcessorContext` interface, we actually limit what functionality it
> > supports. Thus, I am wondering if we should stop using the generic
> > `Processor` interface to begin with, but design a new one which is
> > tailored to the needs of global stores? -- This would of course be of
> > much larger scope than originally intended by this KIP, but it might be
> > a great opportunity to kill two birds with one stone?
> >
> > The only other question to consider is: do we believe that global stores
> > will never have child nodes, or could we actually allow for child nodes
> > in the future? If yes, it might not be smart to move off using
> > `Processor` interface In general, I could imagine, especially as we
> > now want to support "process on restore" to allow simple stateless
> > operators like `map()` or `filter()` on a `GlobalTable` (or allow to add
> > custom global processors) at some point in the future?
> >
> > Just wanted to put this out to see what people think...
> >
> >
> > -Matthias
> >
> >
> > On 2/29/24 1:26 PM, Walker Carlson wrote:
> >> Hello everybody,
> >>
> >> I wanted to propose a change to our addGlobalStore methods so that the
> >> restore behavior can be controlled on a preprocessor level. This should
> >> help Kafka Stream users to better tune Global stores added with the
> >> processor API to better fit their needs.
> >>
> >> Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ
> >>
> >> Thanks,
> >> Walker
> >>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2702

2024-03-06 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-16288) Values.convertToDecimal throws ClassCastExceptions on String inputs

2024-03-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16288.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Values.convertToDecimal throws ClassCastExceptions on String inputs
> ---
>
> Key: KAFKA-16288
> URL: https://issues.apache.org/jira/browse/KAFKA-16288
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 1.1.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> The convertToDecimal function does a best-effort conversion of an arbitrary 
> Object to a BigDecimal. Generally when a conversion cannot take place (such 
> as when an unknown subclass is passed-in) the function throws a 
> DataException. However, specifically for String inputs with valid number 
> within, a ClassCastException is thrown.
> This is because there is an extra "doubleValue" call in the implementation: 
> [https://github.com/apache/kafka/blob/ead2431c37ace9255df88ffe819bb905311af088/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L427]
>  which immediately causes a ClassCastException in the caller: 
> [https://github.com/apache/kafka/blob/ead2431c37ace9255df88ffe819bb905311af088/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L305]
>  
> This appears accidental, because the case for String is explicitly handled, 
> it just behaves poorly. Instead of the ClassCastException, the number should 
> be parsed correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-939: Support Participation in 2PC

2024-03-06 Thread Martijn Visser
Hi all,

It is so exiting to see this KIP and know that it will greatly benefit
Flink and other technologies.

 +1

Best regards,

Martijn

Op vr 1 dec 2023 om 11:07 schreef Artem Livshits


> Hello,
>
> This is a voting thread for
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> .
>
> The KIP proposes extending Kafka transaction support (that already uses 2PC
> under the hood) to enable atomicity of dual writes to Kafka and an external
> database, and helps to fix a long standing Flink issue.
>
> An example of code that uses the dual write recipe with JDBC and should
> work for most SQL databases is here
> https://github.com/apache/kafka/pull/14231.
>
> The FLIP for the sister fix in Flink is here
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>
> -Artem
>


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-03-06 Thread Manikumar
Hi Andrew,

Thanks for the updated KIP. Few queries below:

1. What is the use-case of deliveryCount in ShareFetchResponse?
2. During delete share groups, Do we need to clean any in-memory state from
share-partition leaders?
3. Any metrics for the share-coordinator?

Thanks
Manikumar

On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Manikumar,
> Thanks for your comments.
>
> 1. I believe that in general, there are not situations in which a dynamic
> config
> change is prevented because of the existence of a resource. So, if we
> prevented
> setting config `group.type=consumer` on resource G of GROUP type
> if there was a share group G in existence, it would be a bit weird.
>
> I wonder whether changing the config name to `new.group.type` would help.
> It’s
> ensuring the type of a new group created.
>
> 2. The behaviour for a DEAD share group is intended to be the same as a
> DEAD
> consumer group. The group cannot be “reused” again as such, but the group
> ID
> can be used by a new group.
>
> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT.
>
> 4. In common with Admin.deleteConsumerGroups, the underlying Kafka RPC
> for Admin.deleteShareGroups is DeleteGroups. This is handled by the group
> coordinator and it does this by writing control records (a tombstone in
> this case).
> The KIP doesn’t say anything about this because it’s the same as consumer
> groups.
> Perhaps it would be sensible to add a GroupType to DeleteGroupsRequest so
> we can
> make sure we are deleting the correct type of group. The fact that there
> is not a specific
> RPC for DeleteShareGroups seems correct to me.
>
> 5. I prefer using “o.a.k.clients.consumer” because it’s already a public
> package and
> many of the classes and interfaces such as ConsumerRecord are in that
> package.
>
> I definitely need to add more information about how the Admin operations
> work.
> I will add a section to the KIP in the next version, later today. This
> will fill in details for
> your questions (3) and (4).
>
> Thanks,
> Andrew
>
> > On 14 Feb 2024, at 18:04, Manikumar  wrote:
> >
> > Hi Andrew,
> >
> > Thanks for the KIP. A few comments below.
> >
> > 1. kafka-configs.sh (incrementalAlterConfigs) allows you to dynamically
> > change the configs. Maybe in this case, we should not allow the user to
> > change `group.type` if it's already set.
> > 2. What's the behaviour after a group transitions into DEAD state. Do we
> > add new control records to reset the state? Can we reuse the group again?
> > 3. Are we going to write new control records after the
> > AlterShareGroupOffsets API to reset the state?
> > 4. Is there any API for DeleteShareGroups? I assume, group co-ordinator
> is
> > going to handle the API. If so, Does this mean the group co-ordinator
> also
> > needs to write control records?
> > 5. How about using "org.apache.kafka.clients.consumer.share" package for
> > new interfaces/classes?
> >
> >
> > Thanks,
> > Manikumar
>
>


Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

2024-03-06 Thread José Armando García Sancio
Hi Jun,

See my comments below.

On Tue, Mar 5, 2024 at 2:57 PM Jun Rao  wrote:
> 37. We don't batch multiple topic partitions in AddVoter, RemoveVoter and
> UpdateVoter requests while other requests like Vote and BeginQuorumEpoch
> support batching. Should we make them consistent?

Originally I had them as batched RPCs but decided to make them only
operate on one KRaft topic partition. I made this change primarily
because this is false sharing and batching. Topic partitions in KRaft
are independent; those operations will be handled independently and
committed independently but because of the batching the kraft node
will be required to wait on all the batched operations before it can
send the response.

I know that this is inconsistent with the other RPCs but I am hesitant
to propagate this incorrect batching to new RPCs.

> 38. BeginQuorumEpochRequest: It seems that we need to replace the name
> field with a nodeId field in LeaderEndpoints?

The schema looks good to me. This struct has a different meaning from
the struct in the response of other RPCs. The BeginQuorumEpoch request
is sent by the leader so the expectation is that the sending
node/replica is the leader for all of the partitions sent. This also
means that the endpoints sent in LeaderEndpoints are all for the same
leader (or replica). The reason that BeginQuorumEpoch sends multiple
endpoints is because the leader may be binding to multiple listeners.
The leader sends a tuple (listener name, host name, host port) for
each of its own advertised controller listeners.

> 39. VoteRequest: Will the Voter ever be different from the Candidate? I
> thought that in all the VoteRequests, the voter just votes for itself.

Yes. This is a naming that always confuses me but makes sense to me
after further analysis. The voter (VoterId and VoterUuid) is the
replica receiving the Vote request and potentially voting for the
sender (candidate). The candidate (CandidateId and CandidateUuid) is
the replica sending the Vote request and asking for votes from the
receivers (voters). I tried to better document it in their respective
"about" schema fields.

> 40. EndQuorumEpochRequest: Should we add a replicaUUID field to pair with
> LeaderId?

I don't think so. We can add it for consistency and to help debugging
but I don't think it is needed for correctness. A leader cannot
(should not) change replica uuid and remain leader. In theory the only
way for a replica to change uuid is to lose their disk. If this
happens the expectation is that they will also lose their
QuorumStateData.

> 41. Regarding including replica UUID to identify a voter: It adds a bit of
> complexity. Could you explain whether it is truly needed? Before this KIP,
> KRaft already supports replacing a disk on the voter node, right?

Yes. This KIP attempts to solve two general problems. 1) How to
proactively change the voters set by increasing or decreasing the
replication factor; or replace voters in the voters set. 2) Identify
disk failures and recover from them safely. This is what I have in the
Motivation section:
"Consensus on the cluster metadata partition was achieved by the
voters (controllers). If the operator of a KRaft cluster wanted to
make changes to the set of voters, they would have to shutdown all of
the controllers nodes and manually make changes to the on-disk state
of the old controllers and new controllers. If the operator wanted to
replace an existing voter because of a disk failure or general
hardware failure, they would have to make sure that the new voter node
has a superset of the previous voter's on-disk state. Both of these
solutions are manual and error prone."

directory.id (replica uuid) is needed to identify and resolve disk
failures in a voter. The section "Proposed change / User explanation /
Common scenarios / Disk failure recovery" documents this use case in
more detail.

Thanks,
-- 
-José


Re: [VOTE] KIP-974: Docker Image for GraalVM based Native Kafka Broker

2024-03-06 Thread Ismael Juma
+1 (binding)

Thanks for the KIP.

On Sun, Nov 19, 2023, 10:28 PM Krishna Agarwal 
wrote:

> Hi,
> I'd like to call a vote on KIP-974 which aims to publish a docker image for
> GraalVM based Native Kafka Broker.
>
> KIP -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker
>
> Discussion thread -
> https://lists.apache.org/thread/98wnx4w92fqj5wymkqlqyjsvzxz277hk
>
> Regards,
> Krishna
>


Re: [VOTE] KIP-974: Docker Image for GraalVM based Native Kafka Broker

2024-03-06 Thread Krishna Agarwal
Hi,
We have received 2 binding and 1 non-binding votes in the favour of this
KIP.
We're still in need of additional votes to move this proposal forward. If
you haven't had the opportunity to review the KIP yet, I kindly encourage
you to take a moment to do so and share your valuable vote.

Your participation is greatly appreciated.

Regards,
Krishna

On Mon, Nov 20, 2023 at 11:53 AM Krishna Agarwal <
krishna0608agar...@gmail.com> wrote:

> Hi,
> I'd like to call a vote on KIP-974 which aims to publish a docker image
> for GraalVM based Native Kafka Broker.
>
> KIP -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker
>
> Discussion thread -
> https://lists.apache.org/thread/98wnx4w92fqj5wymkqlqyjsvzxz277hk
>
> Regards,
> Krishna
>


Re: [DISCUSS] KIP-1021: Allow to get last stable offset (LSO) in kafka-get-offsets.sh

2024-03-06 Thread Andrew Schofield
Hi Ahmed,
Looks good to me with one remaining comment.

You’ve used:

bin/kafka-get-offsets.sh  … --time -1 --isolation -committed
bin/kafka-get-offsets.sh  … --time latest --isolation -committed
bin/kafka-get-offsets.sh  … --time -1 --isolation -uncommitted
bin/kafka-get-offsets.sh  … --time latest --isolation -uncommitted

I find the flags starting with a single - to be a bit unusual and doesn’t match 
the --time options such as “latest”. I suggest:

bin/kafka-get-offsets.sh  … --time -1 --isolation committed
bin/kafka-get-offsets.sh  … --time latest --isolation committed
bin/kafka-get-offsets.sh  … --time -1 --isolation uncommitted
bin/kafka-get-offsets.sh  … --time latest --isolation uncommitted

Or even:

bin/kafka-get-offsets.sh  … --time -1 --isolation read-committed
bin/kafka-get-offsets.sh  … --time latest --isolation read-committed
bin/kafka-get-offsets.sh  … --time -1 --isolation read-uncommitted
bin/kafka-get-offsets.sh  … --time latest --isolation read-uncommitted

Apart from that nit, looks good to me.

Thanks,
Andrew


> On 5 Mar 2024, at 16:35, Ahmed Sobeh  wrote:
>
> I adjusted the KIP according to what we agreed on, let me know if you have
> any comments!
>
> Best,
> Ahmed
>
> On Thu, Feb 29, 2024 at 1:44 AM Luke Chen  wrote:
>
>> Hi Ahmed,
>>
>> Thanks for the KIP!
>>
>> Comments:
>> 1. If we all agree with the suggestion from Andrew, you could update the
>> KIP.
>>
>> Otherwise, LGTM!
>>
>>
>> Thanks.
>> Luke
>>
>> On Thu, Feb 29, 2024 at 1:32 AM Andrew Schofield <
>> andrew_schofield_j...@outlook.com> wrote:
>>
>>> Hi Ahmed,
>>> Could do. Personally, I find the existing “--time -1” totally horrid
>>> anyway, which was why
>>> I suggested an alternative. I think your suggestion of a flag for
>>> isolation level is much
>>> better than -6.
>>>
>>> Maybe I should put in a KIP which adds:
>>> --latest (as a synonym for --time -1)
>>> --earliest (as a synonym for --time -2)
>>> --max-timestamp (as a synonym for --time -3)
>>>
>>> That’s really what I would prefer. If the user has a timestamp, use
>>> `--time`. If they want a
>>> specific special offset, use a separate flag.
>>>
>>> Thanks,
>>> Andrew
>>>
 On 28 Feb 2024, at 09:22, Ahmed Sobeh 
>>> wrote:

 Hi Andrew,

 Thanks for the hint! That sounds reasonable, do you think adding a
 conditional argument, something like "--time -1 --isolation -committed"
>>> and
 "--time -1 --isolation -uncommitted" would make sense to keep the
 consistency of getting the offset by time? or do you think having a
>>> special
 argument for this case is better?

 On Tue, Feb 27, 2024 at 2:19 PM Andrew Schofield <
 andrew_schofield_j...@outlook.com> wrote:

> Hi Ahmed,
> Thanks for the KIP.  It looks like a useful addition.
>
> The use of negative timestamps, and in particular letting the user use
> `--time -1` or the equivalent `--time latest`
> is a bit peculiar in this tool already. The negative timestamps come
>>> from
> org.apache.kafka.common.requests.ListOffsetsRequest,
> but you’re not actually adding another value to that. As a result, I
> really wouldn’t recommend using -6 for the new
> flag. LSO is really a variant of -1 with read_committed isolation
>> level.
>
> I think that perhaps it would be better to add `--last-stable` as an
> alternative to `--time`. Then you’ll get the LSO with
> cleaner syntax.
>
> Thanks,
> Andrew
>
>
>> On 27 Feb 2024, at 10:12, Ahmed Sobeh 
> wrote:
>>
>> Hi all,
>> I would like to start a discussion on KIP-1021, which would enable
> getting
>> LSO in kafka-get-offsets.sh:
>>
>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1021%3A+Allow+to+get+last+stable+offset+%28LSO%29+in+kafka-get-offsets.sh
>>
>> Best,
>> Ahmed
>
>

 --
 [image: Aiven] 
 *Ahmed Sobeh*
 Engineering Manager OSPO, *Aiven*
 ahmed.so...@aiven.io 
 aiven.io    |   <
>>> https://www.facebook.com/aivencloud>
    <
>>> https://twitter.com/aiven_io>
 *Aiven Deutschland GmbH*
 Immanuelkirchstraße 26, 10405 Berlin
 Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
 Amtsgericht Charlottenburg, HRB 209739 B
>>>
>>>
>>>
>>
>
>
> --
> [image: Aiven] 
> *Ahmed Sobeh*
> Engineering Manager OSPO, *Aiven*
> ahmed.so...@aiven.io 
> aiven.io    |   
>     
> *Aiven Deutschland GmbH*
> Immanuelkirchstraße 26, 10405 Berlin
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> Amtsgericht Charlottenburg, HRB 209739 B




Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #103

2024-03-06 Thread Apache Jenkins Server
See 




Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2701

2024-03-06 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.6 #146

2024-03-06 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16348) Fix flaky TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2024-03-06 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16348:
--

 Summary: Fix flaky 
TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
 Key: KAFKA-16348
 URL: https://issues.apache.org/jira/browse/KAFKA-16348
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai


{code:java}
Gradle Test Run :tools:test > Gradle Test Executor 36 > 
TopicCommandIntegrationTest > 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String) > 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft 
FAILED
    org.opentest4j.AssertionFailedError: --under-replicated-partitions 
shouldn't return anything: 'Topic: 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress-4l8dkZ6JT2  
Partition: 0    Leader: 3       Replicas: 0,3   Isr: 3' ==> expected: <> but 
was: 
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
        at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
        at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1156)
        at 
app//org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:827)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16322.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
> --
>
> Key: KAFKA-16322
> URL: https://issues.apache.org/jira/browse/KAFKA-16322
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0
>
>
> https://devhub.checkmarx.com/cve-details/CVE-2023-50572/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.8.0 release

2024-03-06 Thread Josep Prat
Hi all,

Thanks for your support. I updated the skeleton release plan created by
Colin. You can find it here:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0

Our last release stumbled upon some problems while releasing and was
delayed by several weeks, so I won't try to shave some weeks from our plan
for 3.8.0 (we might end up having delays again). Please raise your concerns
if you don't agree with the proposed dates.

The current proposal on dates are:

   - KIP Freeze: *15nd May *(Wednesday)
  - A KIP must be accepted by this date in order to be considered for
  this release. Note, any KIP that may not be implemented in a
week, or that
  might destabilize the release, should be deferred.
   - Feature Freeze: *29th May *(Wednesday)
  - *major features merged & working on stabilisation, minor features
  have PR, release branch cut; anything not in this state will be
  automatically moved to the next release in JIRA*
   - Code Freeze: *12th June *(Wednesday)
   - At least two weeks of stabilization will follow Code Freeze, meaning
   we expect to release *no earlier* than *June 26th*. We will move as fast
   as we can, and aim for completion the earliest we can in June.

I went through the KIP list, and found that these are the ones that might
make it into the release:
KIP-853: KRaft Controller Membership Changes (still under discussion)
KIP-942: Add Power(ppc64le) support
KIP-966: Eligible Leader Replicas
KIP-974: Docker Image for GraalVM based Native Kafka Broker
KIP-977: Partition-Level Throughput Metrics
KIP-993: Allow restricting files accessed by File and Directory
ConfigProviders
KIP-994: Minor Enhancements to ListTransactions and DescribeTransactions
APIs
KIP-996: Pre-Vote
KIP-1004: Enforce tasks.max property in Kafka Connect
KIP-1005: Expose EarliestLocalOffset and TieredOffset
KIP-1007: Introduce Remote Storage Not Ready Exception
KIP-1019: Expose method to determine Metric Measurability

Please review the plan and provide any additional information or updates
regarding KIPs that target this release version (3.8).
If you have authored any KIPs that have an inaccurate status in the list,
or are not in the list and should be, or are in the list and should not be
- please share it in this thread so that I can keep the document accurate
and up to date.

Looking forward to your feedback.

Best,

On Wed, Feb 28, 2024 at 10:07 AM Satish Duggana 
wrote:

> Thanks Josep, +1.
>
> On Tue, 27 Feb 2024 at 17:29, Divij Vaidya 
> wrote:
> >
> > Thank you for volunteering Josep. +1 from me.
> >
> > --
> > Divij Vaidya
> >
> >
> >
> > On Tue, Feb 27, 2024 at 9:35 AM Bruno Cadonna 
> wrote:
> >
> > > Thanks Josep!
> > >
> > > +1
> > >
> > > Best,
> > > Bruno
> > >
> > > On 2/26/24 9:53 PM, Chris Egerton wrote:
> > > > Thanks Josep, I'm +1 as well.
> > > >
> > > > On Mon, Feb 26, 2024 at 12:32 PM Justine Olshan
> > > >  wrote:
> > > >
> > > >> Thanks Joesp. +1 from me.
> > > >>
> > > >> On Mon, Feb 26, 2024 at 3:37 AM Josep Prat
>  > > >
> > > >> wrote:
> > > >>
> > > >>> Hi all,
> > > >>>
> > > >>> I'd like to volunteer as release manager for the Apache Kafka 3.8.0
> > > >>> release.
> > > >>> If there are no objections, I'll start building a release plan (or
> > > >> adapting
> > > >>> the one Colin made some weeks ago) in the wiki in the next days.
> > > >>>
> > > >>> Thank you.
> > > >>>
> > > >>> --
> > > >>> [image: Aiven] 
> > > >>>
> > > >>> *Josep Prat*
> > > >>> Open Source Engineering Director, *Aiven*
> > > >>> josep.p...@aiven.io   |   +491715557497
> > > >>> aiven.io    |   <
> > > >> https://www.facebook.com/aivencloud
> > > 
> > > >>>   <
> > > >>> https://twitter.com/aiven_io>
> > > >>> *Aiven Deutschland GmbH*
> > > >>> Alexanderufer 3-7, 10117 Berlin
> > > >>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > >>> Amtsgericht Charlottenburg, HRB 209739 B
> > > >>>
> > > >>
> > > >
> > >
>


-- 
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |   
     
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B


[jira] [Resolved] (KAFKA-16252) Maligned Metrics formatting

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16252.

Resolution: Fixed

> Maligned Metrics formatting
> ---
>
> Key: KAFKA-16252
> URL: https://issues.apache.org/jira/browse/KAFKA-16252
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 3.6.1
>Reporter: James
>Assignee: Cheng-Kai, Zhang
>Priority: Minor
> Fix For: 3.8.0
>
> Attachments: image-2024-02-13-22-34-31-371.png
>
>
> There's some inconsistencies, and I believe indeed some buggy content in the 
> documentation for monitoring kafka.
> 1. Some MBean documentation is presented as a TR with a colspan of 3 instead 
> of its normal location of the third column
> 2. There seems to be some erroneous data posted in the headings for a handful 
> of documentation sections, ex
> {code:java}
>  [2023-09-15 00:40:42,725] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:693) [2023-09-15 00:40:42,729] INFO 
> Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)  {code}
> Links to erroneous content (not permalinks)
>  
>  * [https://kafka.apache.org/documentation/#producer_sender_monitoring]
>  * [https://kafka.apache.org/documentation/#consumer_fetch_monitoring]
>  * [https://kafka.apache.org/documentation/#connect_monitoring]
> This image demonstrates both issues
> !image-2024-02-13-22-34-31-371.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-995: Allow users to specify initial offsets while creating connectors

2024-03-06 Thread Chris Egerton
Hi Yash,

Thanks for the follow-up, I like the benefits it's yielded. I too think
"offsets_status" would be a better name for the response field.
@Ashwin--thoughts?

Cheers,

Chris


On Wed, Mar 6, 2024, 03:08 Ashwin  wrote:

> Thanks Yash,
>
> Yes , I think we can use @JsonInclude(JsonInclude.Include.NON_NULL) to
> exclude “initial_offsets_response” from the create response if offset is
> not specified.
>
> I’ll close the voting this week , if there are no further comments.
>
> Thanks for voting, everyone!
>
>
> Ashwin
>
> On Tue, Mar 5, 2024 at 11:20 PM Yash Mayya  wrote:
>
> > Hi Chris,
> >
> > I followed up with Ashwin offline and I believe he wanted to take a
> closer
> > look at the `ConnectorInfoWithInitialOffsetsResponse` stuff he mentioned
> in
> > the previous email and whether or not that'll be required (alternatively
> > using some Jackson JSON tricks). However, that's an implementation detail
> > and shouldn't hold up the KIP. Bikeshedding a little on the
> > "initial_offsets_response" field - I'm wondering if something like
> > "offsets_status" might be more appropriate, what do you think? I don't
> > think the current name is terrible though, so I'm +1 (binding) if
> everyone
> > else agrees that it's suitable.
> >
> > Thanks,
> > Yash
> >
> > On Tue, Mar 5, 2024 at 9:51 PM Chris Egerton 
> > wrote:
> >
> > > Hi all,
> > >
> > > Wanted to bump this and see if it looks good enough for a third vote.
> > Yash,
> > > any thoughts?
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, Jan 29, 2024 at 2:55 AM Ashwin 
> > > wrote:
> > >
> > > > Thanks for reviewing this KIP,  Yash.
> > > >
> > > > Could you please elaborate on the cleanup steps? For instance, if we
> > > > > encounter an error after wiping existing offsets but before writing
> > the
> > > > new
> > > > > offsets, there's not really any good way to "revert" the wiped
> > offsets.
> > > > > It's definitely extremely unlikely that a user would expect the
> > > previous
> > > > > offsets for a connector to still be present (by creating a new
> > > connector
> > > > > with the same name but without initial offsets for instance) after
> > > such a
> > > > > failed operation, but it would still be good to call this out
> > > > explicitly. I
> > > > > presume that we'd want to wipe the newly written initial offsets if
> > we
> > > > fail
> > > > > while writing the connector's config however?
> > > >
> > > >
> > > > Agree - I have clarified the cleanup here -
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors#KIP995:Allowuserstospecifyinitialoffsetswhilecreatingconnectors-ProposedChanges
> > > > .
> > > >
> > > > The `PATCH /connectors/{connector}/offsets` and `DELETE
> > > > > /connectors/{connector}/offsets` endpoints have two possible
> success
> > > > > messages in the response depending on whether or not the connector
> > > plugin
> > > > > has implemented the `alterOffsets` connector method. Since we're
> > > > proposing
> > > > > to utilize the same offset validation during connector creation if
> > > > initial
> > > > > offsets are specified, I think it would be valuable to surface
> > similar
> > > > > information to users here as well
> > > >
> > > >
> > > > Thanks for pointing this out. I have updated the response to include
> a
> > > new
> > > > field “initial_offsets_response” which will contain the response
> based
> > on
> > > > whether the connector implements alterOffsets or not. This also means
> > > that
> > > > if initial_offsets is set in the ConnectorCreate request, we will
> > return
> > > a
> > > > new REST entity (ConnectorInfoWithInitialOffsetsResponse ?) which
> will
> > > be a
> > > > child class of ConnectorInfo.
> > > >
> > > > (
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java#L28-L28
> > > > )
> > > >
> > > > Thanks,
> > > > Ashwin
> > > >
> > > > On Wed, Jan 17, 2024 at 4:48 PM Yash Mayya 
> > wrote:
> > > >
> > > > > Hi Ashwin,
> > > > >
> > > > > Thanks for the KIP.
> > > > >
> > > > > > If Connect runtime encounters an error in any of these steps,
> > > > > > it will cleanup (if required) and return an error response
> > > > >
> > > > > Could you please elaborate on the cleanup steps? For instance, if
> we
> > > > > encounter an error after wiping existing offsets but before writing
> > the
> > > > new
> > > > > offsets, there's not really any good way to "revert" the wiped
> > offsets.
> > > > > It's definitely extremely unlikely that a user would expect the
> > > previous
> > > > > offsets for a connector to still be present (by creating a new
> > > connector
> > > > > with the same name but without initial offsets for instance) after
> > > such a
> > > > > failed operation, but it would still be good to call this out
> > > > explicitly. I
> > > > > presume that we'd want to wipe 

[jira] [Resolved] (KAFKA-16347) Bump ZooKeeper to 3.8.4

2024-03-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16347.

Resolution: Fixed

[~kevinztw] thanks for this contribution!

> Bump ZooKeeper to 3.8.4
> ---
>
> Key: KAFKA-16347
> URL: https://issues.apache.org/jira/browse/KAFKA-16347
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Mickael Maison
>Assignee: Cheng-Kai, Zhang
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> ZooKeeper 3.8.4 was released and contains a few CVE fixes: 
> https://zookeeper.apache.org/doc/r3.8.4/releasenotes.html
> We should update 3.6, 3.7 and trunk to use this new ZooKeeper release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-06 Thread Bruno Cadonna

Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key 
and value deserializer in Topology#addGlobalStore() that do not take a 
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any 
records, do they still need to deserialize records and extract 
timestamps? Name might still be needed, right?


2.
From an API point of view, it might make sense to put all 
processor-related arguments into a parameter object. Something like:

GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in 
the KIP?



Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:

Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on 
"restore reprocessing" is certainly a good improvement.


 From an API design POV, I like the idea to not require passing in a 
ProcessorSupplier to begin with. Given the current implementation of the 
restore process, this might have been the better API from the beginning 
on... Well, better late than never :)


For this new method w/o a supplier, I am wondering if we want to keep 
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar 
thing via KIP-813. Just an idea.


However, I am not convinced that adding a new boolean parameter is the 
best way to design the API. Unfortunately, I don't have any elegant 
proposal myself. Just a somewhat crazy idea to do a larger API change:


Making a step back, a global store, is by definition a terminal node -- 
we don't support to add child nodes. Hence, while we expose a full 
`ProcessorContext` interface, we actually limit what functionality it 
supports. Thus, I am wondering if we should stop using the generic 
`Processor` interface to begin with, but design a new one which is 
tailored to the needs of global stores? -- This would of course be of 
much larger scope than originally intended by this KIP, but it might be 
a great opportunity to kill two birds with one stone?


The only other question to consider is: do we believe that global stores 
will never have child nodes, or could we actually allow for child nodes 
in the future? If yes, it might not be smart to move off using 
`Processor` interface In general, I could imagine, especially as we 
now want to support "process on restore" to allow simple stateless 
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add 
custom global processors) at some point in the future?


Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:

Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker



Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2700

2024-03-06 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 357625 lines...]
[2024-03-06T08:13:36.479Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testEmptyWrite() STARTED
[2024-03-06T08:13:36.479Z] 
[2024-03-06T08:13:36.479Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testEmptyWrite() PASSED
[2024-03-06T08:13:36.479Z] 
[2024-03-06T08:13:36.479Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testReadMigrateAndWriteProducerId() STARTED
[2024-03-06T08:13:36.479Z] 
[2024-03-06T08:13:36.479Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testReadMigrateAndWriteProducerId() PASSED
[2024-03-06T08:13:36.479Z] 
[2024-03-06T08:13:36.479Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testExistingKRaftControllerClaim() STARTED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testExistingKRaftControllerClaim() PASSED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testMigrateTopicConfigs() STARTED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testMigrateTopicConfigs() PASSED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testNonIncreasingKRaftEpoch() STARTED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testNonIncreasingKRaftEpoch() PASSED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testMigrateEmptyZk() STARTED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testMigrateEmptyZk() PASSED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testTopicAndBrokerConfigsMigrationWithSnapshots() 
STARTED
[2024-03-06T08:13:37.584Z] 
[2024-03-06T08:13:37.584Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testTopicAndBrokerConfigsMigrationWithSnapshots() 
PASSED
[2024-03-06T08:13:37.585Z] 
[2024-03-06T08:13:37.585Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testClaimAndReleaseExistingController() STARTED
[2024-03-06T08:13:37.585Z] 
[2024-03-06T08:13:37.585Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testClaimAndReleaseExistingController() PASSED
[2024-03-06T08:13:37.585Z] 
[2024-03-06T08:13:37.585Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testClaimAbsentController() STARTED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testClaimAbsentController() PASSED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testIdempotentCreateTopics() STARTED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testIdempotentCreateTopics() PASSED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testCreateNewTopic() STARTED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testCreateNewTopic() PASSED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testUpdateExistingTopicWithNewAndChangedPartitions() 
STARTED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZkMigrationClientTest > testUpdateExistingTopicWithNewAndChangedPartitions() 
PASSED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZooKeeperClientTest > testZNodeChangeHandlerForDataChange() STARTED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZooKeeperClientTest > testZNodeChangeHandlerForDataChange() PASSED
[2024-03-06T08:13:38.690Z] 
[2024-03-06T08:13:38.690Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> ZooKeeperClientTest > testZooKeeperSessionStateMetric() STARTED
[2024-03-06T08:13:39.794Z] 
[2024-03-06T08:13:39.794Z] Gradle Test Run :core:test > Gradle Test Executor 95 
> 

[jira] [Created] (KAFKA-16347) Bump ZooKeeper to 3.8.4

2024-03-06 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16347:
--

 Summary: Bump ZooKeeper to 3.8.4
 Key: KAFKA-16347
 URL: https://issues.apache.org/jira/browse/KAFKA-16347
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.1, 3.7.0
Reporter: Mickael Maison
Assignee: Mickael Maison


ZooKeeper 3.8.4 was released and contains a few CVE fixes: 
https://zookeeper.apache.org/doc/r3.8.4/releasenotes.html

We should update 3.6, 3.7 and trunk to use this new ZooKeeper release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)