Re: [DISCUSS] KIP-917: Additional custom metadata for remote log segment
Hi all, I want to bring this to a conclusion (positive or negative), so if there are no more questions in a couple of days, I'll put the KIP to the vote. Best, Ivan On Fri, 5 May 2023 at 18:42, Ivan Yurchenko wrote: > Hi Alexandre, > > > combining custom > > metadata with rlmMetadata increases coupling between Kafka and the > > plugin. > > This is true. However, (if I understand your concern correctly,) > rlmMetadata in the current form may be independent from RSM plugins, but > data they point to are accessible only via the particular plugin (the one > that wrote the data or a compatible one). It seems, this coupling already > exists, but it is implicit. To make my point more concrete, imagine an S3 > RSM which maps RemoteLogSegmentMetadata objects to S3 object keys. This > mapping logic is a part of the RSM plugin and without it the metadata is > useless. I think it will not get worse if (to follow the example) the > plugin makes the said S3 object keys explicit by adding them to the > metadata. From the high level point of view, moving the custom metadata to > a separate topic doesn't change the picture: it's still the plugin that > binds the standard and custom metadata together. > > > > For instance, the custom metadata may need to be modified > > outside of Kafka, but the rlmMetadata would still be cached on brokers > > independently of any update of custom metadata. Since both types of > > metadata are authored by different systems, and are cached in > > different layers, this may become a problem, or make plugin migration > > more difficult. What do you think? > > This is indeed a problem. I think a solution to this would be to clearly > state that metadata being modified outside of Kafka is out of scope and > instruct the plugin authors that custom metadata could be provided only > reactively from the copyLogSegmentData method and must remain immutable > after that. Does it make sense? > > > > Yes, you are right that the suggested alternative is to let the plugin > > store its own metadata separately with a solution chosen by the admin > > or plugin provider. For instance, it could be using a dedicated topic > > if chosen to, or relying on an external key-value store. > > I see. Yes, this option always exists and doesn't even require a KIP. The > biggest drawback I see is that a plugin will need to reimplement the > consumer/producer + caching mechanics that will exist on the broker side > for the standard remote metadata. I'd like to avoid this and this KIP is > the best solution I see. > > Best, > Ivan > > > > On Tue, 18 Apr 2023 at 13:02, Alexandre Dupriez < > alexandre.dupr...@gmail.com> wrote: > >> Hi Ivan, >> >> Thanks for the follow-up. >> >> Yes, you are right that the suggested alternative is to let the plugin >> store its own metadata separately with a solution chosen by the admin >> or plugin provider. For instance, it could be using a dedicated topic >> if chosen to, or relying on an external key-value store. >> >> I agree with you on the existing risks associated with running >> third-party code inside Apache Kafka. That said, combining custom >> metadata with rlmMetadata increases coupling between Kafka and the >> plugin. For instance, the custom metadata may need to be modified >> outside of Kafka, but the rlmMetadata would still be cached on brokers >> independently of any update of custom metadata. Since both types of >> metadata are authored by different systems, and are cached in >> different layers, this may become a problem, or make plugin migration >> more difficult. What do you think? >> >> I have a vague memory of this being discussed back when the tiered >> storage KIP was started. Maybe Satish has more background on this. >> >> Thanks, >> Alexandre >> >> Le lun. 17 avr. 2023 à 16:50, Ivan Yurchenko >> a écrit : >> > >> > Hi Alexandre, >> > >> > Thank you for your feedback! >> > >> > > One question I would have is, what is the benefit of adding these >> > > custom metadata in the rlmMetadata rather than letting the plugin >> > > manage access and persistence to them? >> > >> > Could you please elaborate? Do I understand correctly that the idea is >> that >> > the plugin will have its own storage for those custom metadata, for >> example >> > a special topic? >> > >> > > It would be possible for a user >> > > to use custom metadata large enough to adversely impact access to and >> > > caching of the rlmMetadata by Kafka. >> > >> > Since the custom metadata is 100% under control of the RSM plugin, the >> risk >> > is as big as the risk of running a third-party code (i.e. the RSM >> plugin). >> > The cluster admin must make the decision if they trust it. >> > To mitigate this risk and put it under control, the RSM plugin >> > implementations could document what custom metadata they use and >> estimate >> > their size. >> > >> > Best, >> > Ivan >> > >> > >> > On Mon, 17 Apr 2023 at 18:14, Alexandre Dupriez < >> alexandre.dupr...@gmail.com> >> > wrote: >> > >> > > Hi
Re: [DISCUSS] KIP-899: Allow clients to rebootstrap
Hi Chris and all, > I believe the logic you've linked is only applicable for the producer and > consumer clients; the admin client does something different (see [1]). I see, thank you for the pointer. It seems the admin client is fairly different from the producer and consumer. Probably it makes sense to reduce the scope of the KIP to the producer and consumer clients only. > it'd be nice to have a definition of when re-bootstrapping > would occur that doesn't rely on internal implementation details. What > user-visible phenomena can we identify that would lead to a > re-bootstrapping? Let's put it this way: "Re-bootstrapping means that the client forgets about nodes it knows about and falls back on the bootstrap nodes as if it had just been initialized. Re-bootstrapping happens when, during a metadata update (which may be scheduled by `metadata.max.age.ms` or caused by certain error responses like NOT_LEADER_OR_FOLLOWER, REPLICA_NOT_AVAILABLE, etc.), the client doesn't have a node with an established connection or establishable connection." Does this sound good? > I also believe that if someone has " > reconnect.backoff.max.ms" set to a low-enough value, > NetworkClient::leastLoadedNode may never return null. In that case, > shouldn't we still attempt a re-bootstrap at some point (if the user has > enabled this feature)? Yes, you're right. Particularly `canConnect` here [1] can always be returning `true` if `reconnect.backoff.max.ms` is low enough. It seems pretty difficult to find a good criteria when re-bootstrapping should be forced in this case, so it'd be difficult to configure and reason about. I think it's worth mentioning in the KIP and later in the documentation, but we should not try to do anything special here. > Would it make sense to re-bootstrap only after " > metadata.max.age.ms" has elapsed since the last metadata update, and when > at least one request has been made to contact each known server and been > met with failure? The first condition is satisfied by the check in the beginning of `maybeUpdate` [2]. It seems to me, the second one is also satisfied by `leastLoadedNode`. Admittedly, it's more relaxed than you propose: it tracks unavailability of nodes that was detected by all types of requests, not only by metadata requests. What do you think, would this be enough? [1] https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L698 [2] https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1034-L1041 Best, Ivan On Tue, 21 Feb 2023 at 20:07, Chris Egerton wrote: > Hi Ivan, > > I believe the logic you've linked is only applicable for the producer and > consumer clients; the admin client does something different (see [1]). > > Either way, it'd be nice to have a definition of when re-bootstrapping > would occur that doesn't rely on internal implementation details. What > user-visible phenomena can we identify that would lead to a > re-bootstrapping? I also believe that if someone has " > reconnect.backoff.max.ms" set to a low-enough value, > NetworkClient::leastLoadedNode may never return null. In that case, > shouldn't we still attempt a re-bootstrap at some point (if the user has > enabled this feature)? Would it make sense to re-bootstrap only after " > metadata.max.age.ms" has elapsed since the last metadata update, and when > at least one request has been made to contact each known server and been > met with failure? > > [1] - > > https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java#L100 > > Cheers, > > Chris > > On Sun, Feb 19, 2023 at 3:39 PM Ivan Yurchenko > wrote: > > > Hi Chris, > > > > Thank you for your question. As a part of various lifecycle phases > > (including node disconnect), NetworkClient can request metadata update > > eagerly (the `Metadata.requestUpdate` method), which results in > > `MetadataUpdater.maybeUpdate` being called during next poll. Inside, it > has > > a way to find a known node it can connect to for the fresh metadata. If > no > > such node is found, it backs off. (Code [1]). I'm thinking of > piggybacking > > on this logic and injecting the rebootstrap attempt before the backoff. > > > > As of the second part of you question: the re-bootstrapping means > replacing > > the node addresses in the client with the original bootstrap addresses, > so > > if the first bootstrap attempt fails, the client will continue using the > > bootstrap addresses until success -- pretty much as if it were recreated > > from scratch. > > > > Best, > > Ivan > > > > [1] > > > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1045-L1049 > > > > On Thu, 16 Feb 2023 at 17:18, Chris Egerton > > wrote: > > > > > Hi
[jira] [Created] (KAFKA-15036) Kraft leader change fails when invoking getFinalizedFeatures
Deng Ziming created KAFKA-15036: --- Summary: Kraft leader change fails when invoking getFinalizedFeatures Key: KAFKA-15036 URL: https://issues.apache.org/jira/browse/KAFKA-15036 Project: Kafka Issue Type: Improvement Reporter: Deng Ziming When kraft leader changes, we can receiving a error as follows: {{[2023-05-24 18:00:02,898] WARN [QuorumController id=3002] getFinalizedFeatures: failed with unknown server exception RuntimeException in 271 us. The controller is already in standby mode. (org.apache.kafka.controller.QuorumController) java.lang.RuntimeException: No in-memory snapshot for epoch 9. Snapshot epochs are: at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173) at org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131) at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69) at org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303) at org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016) at org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:829)}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
RE: [DISCUSS] KIP-932: Queues for Kafka
Hello, thank you for the proposal! A very interesting read. I do have one question, though. When you subscribe to a topic using consumer groups, it might happen that one consumer has processed all messages from its partitions, while another one still has a lot of work to do (this might be due to unbalanced partitioning, long processing times etc.). In a message-queue approach, it would be great to solve this problem - so that a consumer that is free can steal work from other consumers. Is this somehow covered by share groups? Maybe this is planned as "further work", as indicated here: " It manages the topic-partition assignments for the share-group members. An initial, trivial implementation would be to give each member the list of all topic-partitions which matches its subscriptions and then use the pull-based protocol to fetch records from all partitions. A more sophisticated implementation could use topic-partition load and lag metrics to distribute partitions among the consumers as a kind of autonomous, self-balancing partition assignment, steering more consumers to busier partitions, for example. Alternatively, a push-based fetching scheme could be used. Protocol details will follow later. " but I’m not sure if I understand this correctly. A fully-connected graph seems like a lot of connections, and I’m not sure if this would play well with streaming. This also seems as one of the central problems - a key differentiator between share and consumer groups (the other one being persisting state of messages). And maybe the exact way we’d want to approach this would, to a certain degree, dictate the design of the queueing system? Best, Adam Warski On 2023/05/15 11:55:14 Andrew Schofield wrote: > Hi, > I would like to start a discussion thread on KIP-932: Queues for Kafka. This > KIP proposes an alternative to consumer groups to enable cooperative > consumption by consumers without partition assignment. You end up with queue > semantics on top of regular Kafka topics, with per-message acknowledgement > and automatic handling of messages which repeatedly fail to be processed. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka > > Please take a look and let me know what you think. > > Thanks. > Andrew
[jira] [Created] (KAFKA-15035) Consumer offsets can be deleted immediately if kafka does not detect a consumer as dead
Sam Cantero created KAFKA-15035: --- Summary: Consumer offsets can be deleted immediately if kafka does not detect a consumer as dead Key: KAFKA-15035 URL: https://issues.apache.org/jira/browse/KAFKA-15035 Project: Kafka Issue Type: Bug Affects Versions: 2.7.2 Reporter: Sam Cantero We've recently encountered a scenario where a consumer group got their committed offsets deleted almost right after (around 3 minutes) the consumer got into inactive state. As per [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets], committed offsets for an active (i.e running) consumer group should not be deleted. However, if a consumer becomes inactive, {+}the deletion of committed offsets will not occur immediately{+}. Instead, the committed offsets will only be removed if the consumer remains inactive for at least the duration specified by [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes]. In our case {{offset.retention.minutes}} is set to 7 days and the consumer was only inactive for 5 minutes, so deletion should have not occurred. By inspecting the KIP-211 further, we can find the following sentence: {quote}If a group consumer unsubscribes from a topic but continues to consume from other subscribed topics, the offset information of that unsubscribed topic’s partitions should be deleted at the appropriate time. {quote} And later on: {quote}If there are partitions the group has offset for but no longer consumes from, and offsets.retention.minutes has passed since their last commit timestamp, the corresponding offsets will be removed from the offset cache {quote} It is implied, though {*}+this is what I want to confirm in this ticket+{*}, that Kafka employs two approaches for offset expiration: * The deletion timer is activated when a consumer group enters the Empty state (i.e., not running). Once the timer exceeds the {{offset.retention.minutes}} threshold, the committed offsets are deleted. * If a consumer is in a "running" state (i.e., not in the Empty state) but is no longer consuming from topics with committed offsets older than the offset.retention.minutes duration, the committed offsets are deleted. Note that the second approach only takes into account the timestamp of the last committed offset. Throughout this event, the affected consumer group didn’t transition into Empty state. Based on the kafka logs, the consumer group was not detected as Empty, indicating that Kafka considered the consumer to be running from its perspective. It’s unclear why kafka didn’t detect this consumer group as Empty. {noformat} 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg has failed, removing it from the group 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason: removing member consumer-mycg-1-uuid on heartbeat expiration) 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg has failed, removing it from the group 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 (__consumer_offsets-16) 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group mycg for generation 433{noformat} This suggests that kafka might have followed the second approach and that's why kafka deleted the offsets 3 minutes later. {noformat} 1:33:17 am - [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 milliseconds.{noformat} As a reference a regular consumer join/startup logs looks like this. The group is stabilised and the assignment from the leader received. {noformat} [GroupCoordinator 0]: Preparing to rebalance group mycg in state PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with group instance id None) (kafka.coordinator.group.GroupCoordinator) [GroupCoordinator 0]: Stabilized group mycg generation 7 (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator) [GroupCoordinator 0]: Assignment received from leader for group mycg for generation 7 (kafka.coordinator.group.GroupCoordinator){noformat} As a reference a regular consumer leave/shutdown logs looks like this. NOTE how the consumer group moves into empty state. {noformat} [GroupCoordinator 0]: Member[group.instance.id None, member.id consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) [GroupCoordinator 0]: Preparing to rebalance group mycg in state PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7
Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records
Hi, Bumping this thread again for further reviews. Thanks! Sagar. On Fri, May 12, 2023 at 3:38 PM Sagar wrote: > Hi All, > > Thanks for the comments/reviews. I have updated the KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records > with a newer approach which shelves the need for an explicit topic. > > Please review again and let me know what you think. > > Thanks! > Sagar. > > > On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya wrote: > >> Hi Sagar, >> >> Thanks for the KIP! I have a few questions and comments: >> >> 1) I agree with Chris' point about the separation of a connector heartbeat >> mechanism and allowing source connectors to generate offsets without >> producing data. What is the purpose of the heartbeat topic here and are >> there any concrete use cases for downstream consumers on this topic? Why >> can't we instead simply introduce a mechanism to retrieve a list of source >> partition / source offset pairs from the source tasks? >> >> 2) With the currently described mechanism, the new >> "SourceTask::produceHeartbeatRecords" method returns a >> "List" >> - what happens with the topic in each of these source records? Chris >> pointed this out above, but it doesn't seem to have been addressed? The >> "SourceRecord" class also has a bunch of other fields which will be >> irrelevant here (partition, key / value schema, key / value data, >> timestamp, headers). In fact, it seems like only the source partition and >> source offset are relevant here, so we should either introduce a new >> abstraction or simply use a data structure like a mapping from source >> partitions to source offsets (adds to the above point)? >> >> 3) I'm not sure I fully follow why the heartbeat timer / interval is >> needed? What are the downsides of >> calling "SourceTask::produceHeartbeatRecords" in every execution loop >> (similar to the existing "SourceTask::poll" method)? Is this only to >> prevent the generation of a lot of offset records? Since Connect's offsets >> topics are log compacted (and source partitions are used as keys for each >> source offset), I'm not sure if such concerns are valid and such a >> heartbeat timer / interval mechanism is required? >> >> 4) The first couple of rejected alternatives state that the use of a null >> topic / key / value are preferably avoided - but the current proposal >> would >> also likely require connectors to use such workarounds (null topic when >> the >> heartbeat topic is configured at a worker level and always for the key / >> value)? >> >> 5) The third rejected alternative talks about subclassing the >> "SourceRecord" class - this presumably means allowing connectors to pass >> special offset only records via the existing poll mechanism? Why was this >> considered a more invasive option? Was it because of the backward >> compatibility issues that would be introduced for plugins using the new >> public API class that still need to be deployed onto older Connect >> workers? >> >> Thanks, >> Yash >> >> On Fri, Apr 14, 2023 at 6:45 PM Sagar wrote: >> >> > One thing I forgot to mention in my previous email was that the reason I >> > chose to include the opt-in behaviour via configs was that the users of >> the >> > connector know their workload patterns. If the workload is such that the >> > connector would receive regular valid updates then there’s ideally no >> need >> > for moving offsets since it would update automatically. >> > >> > This way they aren’t forced to use this feature and can use it only when >> > the workload is expected to be batchy or not frequent. >> > >> > Thanks! >> > Sagar. >> > >> > >> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar >> wrote: >> > >> > > Hi Chris, >> > > >> > > Thanks for following up on the response. Sharing my thoughts further: >> > > >> > > If we want to add support for connectors to emit offsets without >> > >> accompanying source records, we could (and IMO should) do that >> without >> > >> requiring users to manually enable that feature by adjusting worker >> or >> > >> connector configurations. >> > > >> > > >> > > With the current KIP design, I have tried to implement this in an >> opt-in >> > > manner via configs. I guess what you are trying to say is that this >> > doesn't >> > > need a config of it's own and instead could be part of the poll -> >> > > transform etc -> produce -> commit cycle. That way, the users don't >> need >> > to >> > > set any config and if the connector supports moving offsets w/o >> producing >> > > SourceRecords, it should happen automatically. Is that correct? If >> that >> > > is the concern, then I can think of not exposing a config and try to >> make >> > > this process automatically. That should ease the load on connector >> users, >> > > but your point about cognitive load on Connector developers, I am >> still >> > not >> > > sure how to address that. The offsets are privy to a connector and the >> > > framework
[jira] [Created] (KAFKA-15034) Improvement of ReplaceField performance for long list
BDeus created KAFKA-15034: - Summary: Improvement of ReplaceField performance for long list Key: KAFKA-15034 URL: https://issues.apache.org/jira/browse/KAFKA-15034 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 3.4.0 Reporter: BDeus SMTs ReplaceField use List for include and exclude filter that use ArrayList internally. In case of long list of filter the complexity of arraylist _O(n)_ results in poor performance. Could we use HashSet implementation in ReplaceField class instead of the traditionnal ArrayList ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15033) Failed to close repository in stagingRepositories
Luke Chen created KAFKA-15033: - Summary: Failed to close repository in stagingRepositories Key: KAFKA-15033 URL: https://issues.apache.org/jira/browse/KAFKA-15033 Project: Kafka Issue Type: Task Reporter: Luke Chen While running the release.py, and at the step to close for the new repository that was created by uploading artifacts. I encountered some errors while closing it, though it won't block the release since other repo can be closed successfully. {code:java} Event: Failed: Signature ValidationFriday, May 26, 2023 11:02:00 TST (GMT+0800)typeIdsignature-stagingfailureMessageMissing Signature: '/org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.asc' does not exist for 'kafka-streams-upgrade-system-tests-0101-3.4.1.jar'. {code} {code:java} Event: Failed: Checksum ValidationFriday, May 26, 2023 11:02:01 TST (GMT+0800)typeIdchecksum-stagingfailureMessageRequires one-of SHA-1: /org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.sha1, SHA-256: /org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.sha256, SHA-512: /org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.sha512 {code} {code:java} Event: Failed: Signature ValidationMonday, May 29, 2023 17:43:57 TST (GMT+0800)typeIdsignature-stagingfailureMessageMissing Signature: '/org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.asc' does not exist for 'connect-3.4.1.jar'. {code} {code:java} Event: Failed: Checksum ValidationMonday, May 29, 2023 17:43:58 TST (GMT+0800)typeIdchecksum-stagingfailureMessageRequires one-of SHA-1: /org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.sha1, SHA-256: /org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.sha256, SHA-512: /org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.sha512 {code} {code:java} Event: Failed: Checksum ValidationMonday, May 29, 2023 17:45:36 TST (GMT+0800)typeIdchecksum-stagingfailureMessageRequires one-of SHA-1: /org/apache/kafka/generator/3.4.1/generator-3.4.1.jar.sha1, SHA-256: /org/apache/kafka/generator/3.4.1/generator-3.4.1.jar.sha256, SHA-512: /org/apache/kafka/generator/3.4.1/generator-3.4.1.jar.sha512{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-932: Queues for Kafka
Hi Andrew, Thanks for the KIP. Some high level questions: 1. How do we handle "fetch from follower" case? It looks like in current design, each call needs to go to "shared partition leader", where the shared state stored. Is my understanding correct? 2. Where does the state info stored? It looks like we only store them in the memory of "shared partition leader". What happened after the leader crashed and move to other ISR replica? 3. New metrics needed Since we're introducing a new kind of consumer group, I think there should be new metrics added for client and broker to monitor them. Thank you. Luke On Mon, May 29, 2023 at 1:01 PM Satish Duggana wrote: > Minor correction on 103, latest instead of earliest for SPSO default value. > > 103 It talks about SPSO values, latest being the default and user > can reset it to a target offset timestamp. What is the maximum value > for SPEO? It is good to clarify what could be the maximum value for > SPSO and SPEO. It can be HW or LogStableOffset or some other value? > > Thanks, > Satish. > > On Mon, 29 May 2023 at 10:06, Satish Duggana > wrote: > > > > Hi Andrew, > > Thanks for the nice KIP on a very interesting feature about > > introducing some of the traditional MessageQueue semantics to Kafka. > > It is good to see that we are extending the existing consumer groups > > concepts and related mechanisms for shared subscriptions instead of > > bringing any large architectural/protocol changes. > > > > This KIP talks about introducing a durable subscription feature for > > topics with multiple consumers consuming messages parallely from a > > single topic partition. > > > > 101 Are you planning to extend this functionality for queueing > > semantics like JMS point to point style in future? > > > > 102 When a message is rejected by the target consumer, how do users > > know what records/offsets are dropped because of the failed records > > due to rejection ack or due to timeouts etc before DLQs are > > introduced? > > > > 103 It talks about SPSO values, earliest being the default and user > > can reset it to a target offset timestamp. What is the maximum value > > for SPEO? It is good to clarify what could be the maximum value for > > SPSO and SPEO. It can be HW or LogStableOffset or some other value? > > > > 104 KIP mentions that "share.delivery.count.limit" as the maximum > > number of delivery attempts for a record delivered to a share group. > > But the actual delivery count may be more than this number as the > > leader may fail updating the delivery count as leader or consumer may > > fail and more delivery attempts may be made later. It may be the > > minimum number of delivery attempts instead of the maximum delivery > > attempts. > > > > Thanks, > > Satish. > > > > > > On Wed, 24 May 2023 at 21:26, Andrew Schofield > > wrote: > > > > > > Hi Stanislav, > > > Thanks for your email. You bring up some interesting points. > > > > > > 1) Tiered storage > > > I think the situation here for fetching historical data is equivalent > to what happens if a user resets the committed offset for a consumer > > > group back to an earlier point in time. So, I will mention this in the > next update to the KIP document but I think there's nothing > > > especially different here. > > > > > > 2) SSO initialized to the latest offset > > > The KIP does mention that it is possible for an administrator to set > the SSO using either AdminClient.alterShareGroupOffsets or > > > kafka-share-groups.sh. It is entirely intentional that there is no > KafkaConsumer config for initializing the SSO. I know that's how it > > > can be done for consumer groups, but it suffers from the situation > where different consumers have different opinions about > > > the initial value (earliest vs latest) and then the first one in wins. > Also, KIP-842 digs into some problems with how consumer > > > group offset reset works ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) > so > > > I've tried to sidestep those problems too. > > > > > > Another possibility is to follow KIP-848 which proposes that > AdminClient.incrementalAlterConfigs is enhanced to support a new > > > resource type called GROUP and supporting a dynamic group config in > this manner would give a single point of control. > > > > > > 3) Durable storage > > > The KIP does not yet describe how durable storage works. I have a few > ideas that I want to flesh out before updating the KIP. > > > > > > I will rule out using a compacted topic though. The problem is that > each record on a compacted topic is a key:value pair, and > > > it's not obvious what to use as the key. If it's the share group name, > it needs the entire in-flight record state to be recorded in > > > one hit which is extremely inefficient. > > > > > > 4) Batch acknowledgement > > > You are correct that compression makes delivery and acknowledgement of > individual messages within a compressed batch > > > more
[GitHub] [kafka-site] Dionakra opened a new pull request, #517: MINOR: Added missing Kafka Broker docs for metrics.jmx.(include|exclude) configs
Dionakra opened a new pull request, #517: URL: https://github.com/apache/kafka-site/pull/517 Added documentation about Kafka Brokers JMX configurable beans. This was introduced by [KIP-544](https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable), which was available since [Apache Kafka 2.6.0](https://issues.apache.org/jira/browse/KAFKA-9106), but it was never published in the docs. Also, by following [KIP-629](https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase), the config was changed. You can see the config in the [JmxReporter class](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java#L53-L59). This PR adds the documentation since Apache Kafka 2.6.0 for this change, including the name change with the former config deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya resolved KAFKA-14956. Resolution: Fixed > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > Labels: flaky-test > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at >