Re: [DISCUSS] KIP-917: Additional custom metadata for remote log segment

2023-05-29 Thread Ivan Yurchenko
Hi all,

I want to bring this to a conclusion (positive or negative), so if there
are no more questions in a couple of days, I'll put the KIP to the vote.

Best,
Ivan


On Fri, 5 May 2023 at 18:42, Ivan Yurchenko 
wrote:

> Hi Alexandre,
>
> > combining custom
> > metadata with rlmMetadata increases coupling between Kafka and the
> > plugin.
>
> This is true. However, (if I understand your concern correctly,)
> rlmMetadata in the current form may be independent from RSM plugins, but
> data they point to are accessible only via the particular plugin (the one
> that wrote the data or a compatible one). It seems, this coupling already
> exists, but it is implicit. To make my point more concrete, imagine an S3
> RSM which maps RemoteLogSegmentMetadata objects to S3 object keys. This
> mapping logic is a part of the RSM plugin and without it the metadata is
> useless. I think it will not get worse if (to follow the example) the
> plugin makes the said S3 object keys explicit by adding them to the
> metadata. From the high level point of view, moving the custom metadata to
> a separate topic doesn't change the picture: it's still the plugin that
> binds the standard and custom metadata together.
>
>
> > For instance, the custom metadata may need to be modified
> > outside of Kafka, but the rlmMetadata would still be cached on brokers
> > independently of any update of custom metadata. Since both types of
> > metadata are authored by different systems, and are cached in
> > different layers, this may become a problem, or make plugin migration
> > more difficult. What do you think?
>
> This is indeed a problem. I think a solution to this would be to clearly
> state that metadata being modified outside of Kafka is out of scope and
> instruct the plugin authors that custom metadata could be provided only
> reactively from the copyLogSegmentData method and must remain immutable
> after that. Does it make sense?
>
>
> > Yes, you are right that the suggested alternative is to let the plugin
> > store its own metadata separately with a solution chosen by the admin
> > or plugin provider. For instance, it could be using a dedicated topic
> > if chosen to, or relying on an external key-value store.
>
> I see. Yes, this option always exists and doesn't even require a KIP. The
> biggest drawback I see is that a plugin will need to reimplement the
> consumer/producer + caching mechanics that will exist on the broker side
> for the standard remote metadata. I'd like to avoid this and this KIP is
> the best solution I see.
>
> Best,
> Ivan
>
>
>
> On Tue, 18 Apr 2023 at 13:02, Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
>> Hi Ivan,
>>
>> Thanks for the follow-up.
>>
>> Yes, you are right that the suggested alternative is to let the plugin
>> store its own metadata separately with a solution chosen by the admin
>> or plugin provider. For instance, it could be using a dedicated topic
>> if chosen to, or relying on an external key-value store.
>>
>> I agree with you on the existing risks associated with running
>> third-party code inside Apache Kafka. That said, combining custom
>> metadata with rlmMetadata increases coupling between Kafka and the
>> plugin. For instance, the custom metadata may need to be modified
>> outside of Kafka, but the rlmMetadata would still be cached on brokers
>> independently of any update of custom metadata. Since both types of
>> metadata are authored by different systems, and are cached in
>> different layers, this may become a problem, or make plugin migration
>> more difficult. What do you think?
>>
>> I have a vague memory of this being discussed back when the tiered
>> storage KIP was started. Maybe Satish has more background on this.
>>
>> Thanks,
>> Alexandre
>>
>> Le lun. 17 avr. 2023 à 16:50, Ivan Yurchenko
>>  a écrit :
>> >
>> > Hi Alexandre,
>> >
>> > Thank you for your feedback!
>> >
>> > > One question I would have is, what is the benefit of adding these
>> > > custom metadata in the rlmMetadata rather than letting the plugin
>> > > manage access and persistence to them?
>> >
>> > Could you please elaborate? Do I understand correctly that the idea is
>> that
>> > the plugin will have its own storage for those custom metadata, for
>> example
>> > a special topic?
>> >
>> > > It would be possible for a user
>> > > to use custom metadata large enough to adversely impact access to and
>> > > caching of the rlmMetadata by Kafka.
>> >
>> > Since the custom metadata is 100% under control of the RSM plugin, the
>> risk
>> > is as big as the risk of running a third-party code (i.e. the RSM
>> plugin).
>> > The cluster admin must make the decision if they trust it.
>> > To mitigate this risk and put it under control, the RSM plugin
>> > implementations could document what custom metadata they use and
>> estimate
>> > their size.
>> >
>> > Best,
>> > Ivan
>> >
>> >
>> > On Mon, 17 Apr 2023 at 18:14, Alexandre Dupriez <
>> alexandre.dupr...@gmail.com>
>> > wrote:
>> >
>> > > Hi 

Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2023-05-29 Thread Ivan Yurchenko
Hi Chris and all,

> I believe the logic you've linked is only applicable for the producer and
> consumer clients; the admin client does something different (see [1]).

I see, thank you for the pointer. It seems the admin client is fairly
different from the producer and consumer. Probably it makes sense to reduce
the scope of the KIP to the producer and consumer clients only.

> it'd be nice to have a definition of when re-bootstrapping
> would occur that doesn't rely on internal implementation details. What
> user-visible phenomena can we identify that would lead to a
> re-bootstrapping?

Let's put it this way: "Re-bootstrapping means that the client forgets
about nodes it knows about and falls back on the bootstrap nodes as if it
had just been initialized. Re-bootstrapping happens when, during a metadata
update (which may be scheduled by `metadata.max.age.ms` or caused by
certain error responses like NOT_LEADER_OR_FOLLOWER, REPLICA_NOT_AVAILABLE,
etc.), the client doesn't have a node with an established connection or
establishable connection."
Does this sound good?

> I also believe that if someone has "
> reconnect.backoff.max.ms" set to a low-enough value,
> NetworkClient::leastLoadedNode may never return null. In that case,
> shouldn't we still attempt a re-bootstrap at some point (if the user has
> enabled this feature)?

Yes, you're right. Particularly `canConnect` here [1] can always be
returning `true` if `reconnect.backoff.max.ms` is low enough.
It seems pretty difficult to find a good criteria when re-bootstrapping
should be forced in this case, so it'd be difficult to configure and reason
about. I think it's worth mentioning in the KIP and later in the
documentation, but we should not try to do anything special here.

> Would it make sense to re-bootstrap only after "
> metadata.max.age.ms" has elapsed since the last metadata update, and when
> at least one request has been made to contact each known server and been
> met with failure?

The first condition is satisfied by the check in the beginning of
`maybeUpdate` [2].
It seems to me, the second one is also satisfied by `leastLoadedNode`.
Admittedly, it's more relaxed than you propose: it tracks unavailability of
nodes that was detected by all types of requests, not only by metadata
requests.
What do you think, would this be enough?

[1]
https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L698
[2]
https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1034-L1041

Best,
Ivan


On Tue, 21 Feb 2023 at 20:07, Chris Egerton  wrote:

> Hi Ivan,
>
> I believe the logic you've linked is only applicable for the producer and
> consumer clients; the admin client does something different (see [1]).
>
> Either way, it'd be nice to have a definition of when re-bootstrapping
> would occur that doesn't rely on internal implementation details. What
> user-visible phenomena can we identify that would lead to a
> re-bootstrapping? I also believe that if someone has "
> reconnect.backoff.max.ms" set to a low-enough value,
> NetworkClient::leastLoadedNode may never return null. In that case,
> shouldn't we still attempt a re-bootstrap at some point (if the user has
> enabled this feature)? Would it make sense to re-bootstrap only after "
> metadata.max.age.ms" has elapsed since the last metadata update, and when
> at least one request has been made to contact each known server and been
> met with failure?
>
> [1] -
>
> https://github.com/apache/kafka/blob/c9a42c85e2c903329b3550181d230527e90e3646/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java#L100
>
> Cheers,
>
> Chris
>
> On Sun, Feb 19, 2023 at 3:39 PM Ivan Yurchenko 
> wrote:
>
> > Hi Chris,
> >
> > Thank you for your question. As a part of various lifecycle phases
> > (including node disconnect), NetworkClient can request metadata update
> > eagerly (the `Metadata.requestUpdate` method), which results in
> > `MetadataUpdater.maybeUpdate` being called during next poll. Inside, it
> has
> > a way to find a known node it can connect to for the fresh metadata. If
> no
> > such node is found, it backs off. (Code [1]). I'm thinking of
> piggybacking
> > on this logic and injecting the rebootstrap attempt before the backoff.
> >
> > As of the second part of you question: the re-bootstrapping means
> replacing
> > the node addresses in the client with the original bootstrap addresses,
> so
> > if the first bootstrap attempt fails, the client will continue using the
> > bootstrap addresses until success -- pretty much as if it were recreated
> > from scratch.
> >
> > Best,
> > Ivan
> >
> > [1]
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1045-L1049
> >
> > On Thu, 16 Feb 2023 at 17:18, Chris Egerton 
> > wrote:
> >
> > > Hi 

[jira] [Created] (KAFKA-15036) Kraft leader change fails when invoking getFinalizedFeatures

2023-05-29 Thread Deng Ziming (Jira)
Deng Ziming created KAFKA-15036:
---

 Summary: Kraft leader change fails when invoking 
getFinalizedFeatures
 Key: KAFKA-15036
 URL: https://issues.apache.org/jira/browse/KAFKA-15036
 Project: Kafka
  Issue Type: Improvement
Reporter: Deng Ziming


When kraft leader changes, we can receiving a error as follows:
 
{{[2023-05-24 18:00:02,898] WARN [QuorumController id=3002] 
getFinalizedFeatures: failed with unknown server exception RuntimeException in 
271 us.  The controller is already in standby mode. 
(org.apache.kafka.controller.QuorumController)
java.lang.RuntimeException: No in-memory snapshot for epoch 9. Snapshot epochs 
are: 
at 
org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173)
at 
org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131)
at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69)
at 
org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303)
at 
org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016)
at 
org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:829)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


RE: [DISCUSS] KIP-932: Queues for Kafka

2023-05-29 Thread Adam Warski
Hello,

thank you for the proposal! A very interesting read.

I do have one question, though. When you subscribe to a topic using consumer 
groups, it might happen that one consumer has processed all messages from its 
partitions, while another one still has a lot of work to do (this might be due 
to unbalanced partitioning, long processing times etc.). In a message-queue 
approach, it would be great to solve this problem - so that a consumer that is 
free can steal work from other consumers. Is this somehow covered by share 
groups?

Maybe this is planned as "further work", as indicated here:

"
It manages the topic-partition assignments for the share-group members. An 
initial, trivial implementation would be to give each member the list of all 
topic-partitions which matches its subscriptions and then use the pull-based 
protocol to fetch records from all partitions. A more sophisticated 
implementation could use topic-partition load and lag metrics to distribute 
partitions among the consumers as a kind of autonomous, self-balancing 
partition assignment, steering more consumers to busier partitions, for 
example. Alternatively, a push-based fetching scheme could be used. Protocol 
details will follow later.
"

but I’m not sure if I understand this correctly. A fully-connected graph seems 
like a lot of connections, and I’m not sure if this would play well with 
streaming.

This also seems as one of the central problems - a key differentiator between 
share and consumer groups (the other one being persisting state of messages). 
And maybe the exact way we’d want to approach this would, to a certain degree, 
dictate the design of the queueing system?

Best,
Adam Warski

On 2023/05/15 11:55:14 Andrew Schofield wrote:
> Hi,
> I would like to start a discussion thread on KIP-932: Queues for Kafka. This 
> KIP proposes an alternative to consumer groups to enable cooperative 
> consumption by consumers without partition assignment. You end up with queue 
> semantics on top of regular Kafka topics, with per-message acknowledgement 
> and automatic handling of messages which repeatedly fail to be processed.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> 
> Please take a look and let me know what you think.
> 
> Thanks.
> Andrew 



[jira] [Created] (KAFKA-15035) Consumer offsets can be deleted immediately if kafka does not detect a consumer as dead

2023-05-29 Thread Sam Cantero (Jira)
Sam Cantero created KAFKA-15035:
---

 Summary: Consumer offsets can be deleted immediately if kafka does 
not detect a consumer as dead
 Key: KAFKA-15035
 URL: https://issues.apache.org/jira/browse/KAFKA-15035
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.2
Reporter: Sam Cantero


We've recently encountered a scenario where a consumer group got their 
committed offsets deleted almost right after (around 3 minutes) the consumer 
got into inactive state.

As per 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
 committed offsets for an active (i.e running) consumer group should not be 
deleted. However, if a consumer becomes inactive, {+}the deletion of committed 
offsets will not occur immediately{+}. Instead, the committed offsets will only 
be removed if the consumer remains inactive for at least the duration specified 
by 
[offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].

In our case {{offset.retention.minutes}} is set to 7 days and the consumer was 
only inactive for 5 minutes, so deletion should have not occurred.

By inspecting the KIP-211 further, we can find the following sentence:
{quote}If a group consumer unsubscribes from a topic but continues to consume 
from other subscribed topics, the offset information of that unsubscribed 
topic’s partitions should be deleted at the appropriate time.
{quote}
And later on:
{quote}If there are partitions the group has offset for but no longer consumes 
from, and offsets.retention.minutes has passed since their last commit 
timestamp, the corresponding offsets will be removed from the offset cache
{quote}
It is implied, though {*}+this is what I want to confirm in this ticket+{*}, 
that Kafka employs two approaches for offset expiration:
 * The deletion timer is activated when a consumer group enters the Empty state 
(i.e., not running). Once the timer exceeds the {{offset.retention.minutes}} 
threshold, the committed offsets are deleted.
 * If a consumer is in a "running" state (i.e., not in the Empty state) but is 
no longer consuming from topics with committed offsets older than the 
offset.retention.minutes duration, the committed offsets are deleted.

Note that the second approach only takes into account the timestamp of the last 
committed offset.

Throughout this event, the affected consumer group didn’t transition into Empty 
state. Based on the kafka logs, the consumer group was not detected as Empty, 
indicating that Kafka considered the consumer to be running from its 
perspective. It’s unclear why kafka didn’t detect this consumer group as Empty.
{noformat}
01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
has failed, removing it from the group

01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason: 
removing member consumer-mycg-1-uuid on heartbeat expiration)

1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
has failed, removing it from the group

01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
(__consumer_offsets-16)

01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
mycg for generation 433{noformat}
This suggests that kafka might have followed the second approach and that's why 
kafka deleted the offsets 3 minutes later.
{noformat}
1:33:17 am - 
[GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
milliseconds.{noformat}
As a reference a regular consumer join/startup logs looks like this. The group 
is stabilised and the assignment from the leader received.
{noformat}
 [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
group instance id None) (kafka.coordinator.group.GroupCoordinator)
 
[GroupCoordinator 0]: Stabilized group mycg generation 7 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Assignment received from leader for group mycg for 
generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
As a reference a regular consumer leave/shutdown logs looks like this. NOTE how 
the consumer group moves into empty state.

 
{noformat}
[GroupCoordinator 0]: Member[group.instance.id None, member.id 
consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
removing it from the group (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-05-29 Thread Sagar
Hi,

Bumping this thread again for further reviews.

Thanks!
Sagar.

On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:

> Hi All,
>
> Thanks for the comments/reviews. I have updated the KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> with a newer approach which shelves the need for an explicit topic.
>
> Please review again and let me know what you think.
>
> Thanks!
> Sagar.
>
>
> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the KIP! I have a few questions and comments:
>>
>> 1) I agree with Chris' point about the separation of a connector heartbeat
>> mechanism and allowing source connectors to generate offsets without
>> producing data. What is the purpose of the heartbeat topic here and are
>> there any concrete use cases for downstream consumers on this topic? Why
>> can't we instead simply introduce a mechanism to retrieve a list of source
>> partition / source offset pairs from the source tasks?
>>
>> 2) With the currently described mechanism, the new
>> "SourceTask::produceHeartbeatRecords" method returns a
>> "List"
>> - what happens with the topic in each of these source records? Chris
>> pointed this out above, but it doesn't seem to have been addressed? The
>> "SourceRecord" class also has a bunch of other fields which will be
>> irrelevant here (partition, key / value schema, key / value data,
>> timestamp, headers). In fact, it seems like only the source partition and
>> source offset are relevant here, so we should either introduce a new
>> abstraction or simply use a data structure like a mapping from source
>> partitions to source offsets (adds to the above point)?
>>
>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>> needed? What are the downsides of
>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>> (similar to the existing "SourceTask::poll" method)? Is this only to
>> prevent the generation of a lot of offset records? Since Connect's offsets
>> topics are log compacted (and source partitions are used as keys for each
>> source offset), I'm not sure if such concerns are valid and such a
>> heartbeat timer / interval mechanism is required?
>>
>> 4) The first couple of rejected alternatives state that the use of a null
>> topic / key / value are preferably avoided - but the current proposal
>> would
>> also likely require connectors to use such workarounds (null topic when
>> the
>> heartbeat topic is configured at a worker level and always for the key /
>> value)?
>>
>> 5) The third rejected alternative talks about subclassing the
>> "SourceRecord" class - this presumably means allowing connectors to pass
>> special offset only records via the existing poll mechanism? Why was this
>> considered a more invasive option? Was it because of the backward
>> compatibility issues that would be introduced for plugins using the new
>> public API class that still need to be deployed onto older Connect
>> workers?
>>
>> Thanks,
>> Yash
>>
>> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>>
>> > One thing I forgot to mention in my previous email was that the reason I
>> > chose to include the opt-in behaviour via configs was that the users of
>> the
>> > connector know their workload patterns. If the workload is such that the
>> >  connector would receive regular valid updates then there’s ideally no
>> need
>> > for moving offsets since it would update automatically.
>> >
>> > This way they aren’t forced to use this feature and can use it only when
>> > the workload is expected to be batchy or not frequent.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar 
>> wrote:
>> >
>> > > Hi Chris,
>> > >
>> > > Thanks for following up on the response. Sharing my thoughts further:
>> > >
>> > > If we want to add support for connectors to emit offsets without
>> > >> accompanying source records, we could (and IMO should) do that
>> without
>> > >> requiring users to manually enable that feature by adjusting worker
>> or
>> > >> connector configurations.
>> > >
>> > >
>> > > With the current KIP design, I have tried to implement this in an
>> opt-in
>> > > manner via configs. I guess what you are trying to say is that this
>> > doesn't
>> > > need a config of it's own and instead could be part of the poll ->
>> > > transform etc -> produce -> commit cycle. That way, the users don't
>> need
>> > to
>> > > set any config and if the connector supports moving offsets w/o
>> producing
>> > > SourceRecords, it should happen automatically. Is that correct? If
>> that
>> > > is the concern, then I can think of not exposing a config and try to
>> make
>> > > this process automatically. That should ease the load on connector
>> users,
>> > > but your point about cognitive load on Connector developers, I am
>> still
>> > not
>> > > sure how to address that. The offsets are privy to a connector and the
>> > > framework 

[jira] [Created] (KAFKA-15034) Improvement of ReplaceField performance for long list

2023-05-29 Thread BDeus (Jira)
BDeus created KAFKA-15034:
-

 Summary: Improvement of ReplaceField performance for long list
 Key: KAFKA-15034
 URL: https://issues.apache.org/jira/browse/KAFKA-15034
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 3.4.0
Reporter: BDeus


SMTs ReplaceField use List for include and exclude filter that use 
ArrayList internally.

In case of long list of filter the complexity of arraylist _O(n)_ results in 
poor performance.

Could we use HashSet implementation in ReplaceField class instead of the 
traditionnal ArrayList ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15033) Failed to close repository in stagingRepositories

2023-05-29 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15033:
-

 Summary: Failed to close repository in stagingRepositories
 Key: KAFKA-15033
 URL: https://issues.apache.org/jira/browse/KAFKA-15033
 Project: Kafka
  Issue Type: Task
Reporter: Luke Chen


While running the release.py, and at the step to close for the new repository 
that was created by uploading artifacts. I encountered some errors while 
closing it, though it won't block the release since other repo can be closed 
successfully.

 
{code:java}
Event: Failed: Signature ValidationFriday, May 26, 2023 11:02:00 TST 
(GMT+0800)typeIdsignature-stagingfailureMessageMissing
 Signature: 
'/org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.asc'
 does not exist for 'kafka-streams-upgrade-system-tests-0101-3.4.1.jar'. {code}
 
{code:java}
Event: Failed: Checksum ValidationFriday, May 26, 2023 11:02:01 TST 
(GMT+0800)typeIdchecksum-stagingfailureMessageRequires
 one-of SHA-1: 
/org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.sha1,
 SHA-256: 
/org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.sha256,
 SHA-512: 
/org/apache/kafka/kafka-streams-upgrade-system-tests-0101/3.4.1/kafka-streams-upgrade-system-tests-0101-3.4.1.jar.sha512
 {code}
{code:java}
Event: Failed: Signature ValidationMonday, May 29, 2023 17:43:57 TST 
(GMT+0800)typeIdsignature-stagingfailureMessageMissing Signature: 
'/org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.asc' does not exist for 
'connect-3.4.1.jar'. {code}
{code:java}
Event: Failed: Checksum ValidationMonday, May 29, 2023 17:43:58 TST 
(GMT+0800)typeIdchecksum-stagingfailureMessageRequires
 one-of SHA-1: /org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.sha1, 
SHA-256: /org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.sha256, 
SHA-512: /org/apache/kafka/connect/3.4.1/connect-3.4.1.jar.sha512 {code}
 
{code:java}
Event: Failed: Checksum ValidationMonday, May 29, 2023 17:45:36 TST 
(GMT+0800)typeIdchecksum-stagingfailureMessageRequires
 one-of SHA-1: 
/org/apache/kafka/generator/3.4.1/generator-3.4.1.jar.sha1, SHA-256: 
/org/apache/kafka/generator/3.4.1/generator-3.4.1.jar.sha256, SHA-512: 
/org/apache/kafka/generator/3.4.1/generator-3.4.1.jar.sha512{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2023-05-29 Thread Luke Chen
Hi Andrew,

Thanks for the KIP.
Some high level questions:
1. How do we handle "fetch from follower" case?
It looks like in current design, each call needs to go to "shared partition
leader", where the shared state stored. Is my understanding correct?

2. Where does the state info stored?
It looks like we only store them in the memory of "shared partition
leader". What happened after the leader crashed and move to other ISR
replica?

3. New metrics needed
Since we're introducing a new kind of consumer group, I think there should
be new metrics added for client and broker to monitor them.

Thank you.
Luke

On Mon, May 29, 2023 at 1:01 PM Satish Duggana 
wrote:

> Minor correction on 103, latest instead of earliest for SPSO default value.
>
> 103 It talks about SPSO values, latest being the default and user
> can reset it to a target offset timestamp. What is the maximum value
> for SPEO? It is good to clarify what could be the maximum value for
> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>
> Thanks,
> Satish.
>
> On Mon, 29 May 2023 at 10:06, Satish Duggana 
> wrote:
> >
> > Hi Andrew,
> > Thanks for the nice KIP on a very interesting feature about
> > introducing some of the traditional MessageQueue semantics to Kafka.
> > It is good to see that we are extending the existing consumer groups
> > concepts and related mechanisms for shared subscriptions instead of
> > bringing any large architectural/protocol changes.
> >
> > This KIP talks about introducing a durable subscription feature for
> > topics with multiple consumers consuming messages parallely from a
> > single topic partition.
> >
> > 101 Are you planning to extend this functionality for queueing
> > semantics like JMS point to point style in future?
> >
> > 102 When a message is rejected by the target consumer, how do users
> > know what records/offsets are dropped because of the failed records
> > due to rejection ack or due to timeouts etc before DLQs are
> > introduced?
> >
> > 103 It talks about SPSO values, earliest being the default and user
> > can reset it to a target offset timestamp. What is the maximum value
> > for SPEO? It is good to clarify what could be the maximum value for
> > SPSO and SPEO. It can be HW or LogStableOffset or some other value?
> >
> > 104 KIP mentions that "share.delivery.count.limit" as the maximum
> > number of delivery attempts for a record delivered to a share group.
> > But the actual delivery count may be more than this number as the
> > leader may fail updating the delivery count as leader or consumer may
> > fail and more delivery attempts may be made later. It may be the
> > minimum number of delivery attempts instead of the maximum delivery
> > attempts.
> >
> > Thanks,
> > Satish.
> >
> >
> > On Wed, 24 May 2023 at 21:26, Andrew Schofield
> >  wrote:
> > >
> > > Hi Stanislav,
> > > Thanks for your email. You bring up some interesting points.
> > >
> > > 1) Tiered storage
> > > I think the situation here for fetching historical data is equivalent
> to what happens if a user resets the committed offset for a consumer
> > > group back to an earlier point in time. So, I will mention this in the
> next update to the KIP document but I think there's nothing
> > > especially different here.
> > >
> > > 2) SSO initialized to the latest offset
> > > The KIP does mention that it is possible for an administrator to set
> the SSO using either AdminClient.alterShareGroupOffsets or
> > > kafka-share-groups.sh. It is entirely intentional that there is no
> KafkaConsumer config for initializing the SSO. I know that's how it
> > > can be done for consumer groups, but it suffers from the situation
> where different consumers have different opinions about
> > > the initial value (earliest vs latest) and then the first one in wins.
> Also, KIP-842 digs into some problems with how consumer
> > > group offset reset works (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
> so
> > > I've tried to sidestep those problems too.
> > >
> > > Another possibility is to follow KIP-848 which proposes that
> AdminClient.incrementalAlterConfigs is enhanced to support a new
> > > resource type called GROUP and supporting a dynamic group config in
> this manner would give a single point of control.
> > >
> > > 3) Durable storage
> > > The KIP does not yet describe how durable storage works. I have a few
> ideas that I want to flesh out before updating the KIP.
> > >
> > > I will rule out using a compacted topic though. The problem is that
> each record on a compacted topic is a key:value pair, and
> > > it's not obvious what to use as the key. If it's the share group name,
> it needs the entire in-flight record state to be recorded in
> > > one hit which is extremely inefficient.
> > >
> > > 4) Batch acknowledgement
> > > You are correct that compression makes delivery and acknowledgement of
> individual messages within a compressed batch
> > > more 

[GitHub] [kafka-site] Dionakra opened a new pull request, #517: MINOR: Added missing Kafka Broker docs for metrics.jmx.(include|exclude) configs

2023-05-29 Thread via GitHub


Dionakra opened a new pull request, #517:
URL: https://github.com/apache/kafka-site/pull/517

   Added documentation about Kafka Brokers JMX configurable beans. This was 
introduced by 
[KIP-544](https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable),
 which was available since [Apache Kafka 
2.6.0](https://issues.apache.org/jira/browse/KAFKA-9106), but it was never 
published in the docs.
   
   Also, by following 
[KIP-629](https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase),
 the config was changed.
   
   You can see the config in the [JmxReporter 
class](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java#L53-L59).
   
   This PR adds the documentation since Apache Kafka 2.6.0 for this change, 
including the name change with the former config deprecated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-29 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-14956.

Resolution: Fixed

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>  Labels: flaky-test
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
>