Re: Problem with Kafka KRaft 3.1.X

2022-09-09 Thread Paul Brebner
Colin, hi, current max partitions reached is about 600,000 - I had to
increase Linux file descriptors, mmap, and tweak the JVM heap settings a
bit - heap error again.
This is a bit of a hack to, as RF=1 and only a single EC2 instance - a
proper 3 node cluster would in theory give >1M partitions which was what I
really wanted to test out. I think I was also hitting this error attempting
to create a single topic with lots of partitions:
https://github.com/apache/kafka/pull/12595
Current approach is to create multiple topics with 1000 partitions each, or
single topic and increase the number of partitions.
I've also got some good numbers around speed of meta data operations of
Zookeeper vs. KRaft mode (KRaft lots faster = O(1) c.f. O(n) for ZK) etc.
Anyway I'm happy I've got some numbers to report for my talk now, thanks
for the info.

Regards, Paul

On Sat, 10 Sept 2022 at 02:43, Colin McCabe  wrote:

> Hi Paul,
>
> As Keith wrote, it does sound like you are hitting a separate Linux limit
> like the max mmap count.
>
> I'm curious how many partitions you can create if you change that config!
>
> best,
> Colin
>
>
> On Tue, Sep 6, 2022, at 14:02, Keith Paulson wrote:
> > I've had similar errors cause by mmap counts; try with
> > vm.max_map_count=262144
> >
> >
> > On 2022/09/01 23:57:54 Paul Brebner wrote:
> >> Hi all,
> >>
> >> I've been attempting to benchmark Kafka KRaft version for an ApacheCon
> > talk
> >> and have identified 2 problems:
> >>
> >> 1 - it's still impossible to create large number of partitions/topics -
> I
> >> can create more than the comparable Zookeeper version but still not
> >> "millions" - this is with RF=1 (as anything higher needs huge clusters
> to
> >> cope with the replication CPU overhead) only, and no load on the
> clusters
> >> yet (i.e. purely a topic/partition creation experiment).
> >>
> >> 2 - eventually the topic/partition creation command causes the Kafka
> >> process to fail - looks like a memory error -
> >>
> >> java.lang.OutOfMemoryError: Metaspace
> >> OpenJDK 64-Bit Server VM warning: INFO:
> >> os::commit_memory(0x7f4f554f9000, 65536, 1) failed; error='Not
> enough
> >> space' (errno=12)
> >>
> >> or similar error
> >>
> >> seems to happen consistently around 30,000+ partitions - this is on a
> test
> >> EC2 instance with 32GB Ram, 500,000 file descriptors (increased from
> >> default) and 64GB disk (plenty spare). I'm not an OS expert, but the
> kafka
> >> process and the OS both seem to have plenty of RAM when this error
> occurs.
> >>
> >> So there's 3 questions really: What's going wrong exactly? How to
> achieve
> >> more partitions? And should the topic create command (just using the CLI
> > at
> >> present to create topics) really be capable of killing the Kafka
> instance,
> >> or should it fail and throw an error, and the Kafka instance still
> > continue
> >> working...
> >>
> >> Regards, Paul Brebner
> >>
>


[jira] [Created] (KAFKA-14217) app-reset-tool.html should remove reference to --zookeeper flag that no longer exists

2022-09-09 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14217:


 Summary: app-reset-tool.html should remove reference to 
--zookeeper flag that no longer exists
 Key: KAFKA-14217
 URL: https://issues.apache.org/jira/browse/KAFKA-14217
 Project: Kafka
  Issue Type: Bug
  Components: docs, documentation
Affects Versions: 3.30, 3.3
Reporter: Colin McCabe
Assignee: Colin McCabe


app-reset-tool.html should remove reference to --zookeeper flag that no longer 
exists



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14216) Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc

2022-09-09 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14216:


 Summary: Remove ZK reference from 
org.apache.kafka.server.quota.ClientQuotaCallback javadoc
 Key: KAFKA-14216
 URL: https://issues.apache.org/jira/browse/KAFKA-14216
 Project: Kafka
  Issue Type: Bug
  Components: docs, documentation
Affects Versions: 3.3.0, 3.3
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14215) KRaft forwarded requests have no quota enforcement

2022-09-09 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14215:
---

 Summary: KRaft forwarded requests have no quota enforcement
 Key: KAFKA-14215
 URL: https://issues.apache.org/jira/browse/KAFKA-14215
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


On the broker, the `BrokerMetadataPublisher` is responsible for propagating 
quota changes from `ClientQuota` records to `ClientQuotaManager`. On the 
controller, there is no similar logic, so no client quotas are enforced on the 
controller.

On the broker side, there is no enforcement as well since the broker assumes 
that the controller will be the one to do it. Basically it looks at the 
throttle time returned in the response from the controller. If it is 0, then 
the response is sent immediately without any throttling. 

So the consequence of both of these issues is that controller-bound requests 
have no throttling today.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-862: Self-join optimization for stream-stream joins

2022-09-09 Thread Guozhang Wang
Thanks Vicky, that reads much clearer now.

Just regarding the value string name itself: "self.join" may be confusing
compared to other values that people would think before this config is
enabled, self-join are not allowed at all. Maybe we can rename it to
"single.store.self.join"?

Guozhang

On Fri, Sep 9, 2022 at 2:15 AM Vasiliki Papavasileiou
 wrote:

> Hey Guozhang,
>
> Ah it seems my text was not very clear :)
> With "TOPOLOGY_OPTIMIZATION_CONFIG will be extended to accept a list of
> optimization rule configs" I meant that it will accept the new value
> strings for each optimization rule. Let me rephrase that in the KIP to make
> it clearer.
> Is it better now?
>
> Best,
> Vicky
>
> On Thu, Sep 8, 2022 at 9:07 PM Guozhang Wang  wrote:
>
> > Thanks Vicky,
> >
> > I read through the KIP again and it looks good to me. Just a quick
> question
> > regarding the public config changes: you mentioned "No public interfaces
> > will be impacted. The config TOPOLOGY_OPTIMIZATION_CONFIG will be
> extended
> > to accept a list of optimization rule configs in addition to the global
> > values "all" and "none" . But there are no new value strings mentioned in
> > this KIP, so that means we will apply this optimization only when `all`
> is
> > specified in the config right?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 8, 2022 at 12:02 PM Vasiliki Papavasileiou
> >  wrote:
> >
> > > Hello everyone,
> > >
> > > I'd like to open the vote for KIP-862, which proposes to optimize
> > > stream-stream self-joins by using a single state store for the join.
> > >
> > > The proposal is here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins
> > >
> > > Thanks to all who reviewed the proposal, and thanks in advance for
> taking
> > > the time to vote!
> > >
> > > Thank you,
> > > Vicky
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-14214) StandardAuthorizer may transiently process ACLs out of write order

2022-09-09 Thread Akhilesh Chaganti (Jira)
Akhilesh Chaganti created KAFKA-14214:
-

 Summary: StandardAuthorizer may transiently process ACLs out of 
write order
 Key: KAFKA-14214
 URL: https://issues.apache.org/jira/browse/KAFKA-14214
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3
Reporter: Akhilesh Chaganti


The issue with StandardAuthorizer#authorize is, that it looks up 
aclsByResources (which is of type ConcurrentSkipListMap)twice for every 
authorize call and uses Iterator with weak consistency guarantees on top of 
aclsByResources. This can cause the authorize function call to process the 
concurrent writes out of order.
*Issue 1:*
When StandardAuthorizer calls into a simple authorize function, we check the 
ACLs for literal/prefix matches for the resource and then make one more call to 
check the ACLs for matching wildcard entries. Between the two (checkSection) 
calls, let’s assume we add a DENY for resource literal and add an ALLOW ALL 
wildcard. The first call to check literal/prefix rules will SKIP DENY ACL since 
the writes are not yet processed and the second call would find ALLOW wildcard 
entry which results in ALLOW authorization for the resource when it is actually 
DENY.

*Issue: 2*

For authorization, StandardAuthorizer depends on an iterator that iterates 
through the ordered set of ACLs. The iterator has weak consistency guarantees. 
So when writes for two ACLs occur, one of the ACLs might be still visible to 
the iterator while the other is not. 

Let’s say below two ACLS are getting added in the following order to the set.
Acl1 = StandardAcl(TOPIC, foobar, LITERAL, DENY, READ, user1)
Acl2 = StandardAcl(TOPIC, foo, PREFIX, ALLOW, READ, user1)
Depending on the position of the iterator on the ordered set during the write 
call, the iterator might just see Acl2 which prompts it to ALLOW the topic to 
be READ even though the DENY rule was written before.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread Guozhang Wang
Hello David,

Alright I think that's sufficient. Just to make that clear in the doc,
could we update:

1) the heartbeat request handling section, stating when coordinator will
trigger rebalance based on the HB's member metadata / reason?
2) the "Rebalance Triggers" section to include what we described in "Group
Epoch - Trigger a rebalance" section as well?


Guozhang

On Fri, Sep 9, 2022 at 1:28 AM David Jacot 
wrote:

> Hi Guozhang,
>
> I thought that the assignor will always be consulted when the next
> heartbeat request is constructed. In other words,
> `PartitionAssignor#metadata` will be called for every heartbeat. This
> gives the opportunity for the assignor to enforce a rebalance by
> setting the reason to a non-zero value or by changing the bytes. Do
> you think that this is not sufficient? Are you concerned by the delay?
>
> Best,
> David
>
> On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang  wrote:
> >
> > Hello David,
> >
> > One of Jun's comments make me thinking:
> >
> > ```
> > In this case, a new assignment is triggered by the client side
> > assignor. When constructing the HB, the consumer will always consult
> > the client side assignor and propagate the information to the group
> > coordinator. In other words, we don't expect users to call
> > Consumer#enforceRebalance anymore.
> > ```
> >
> > As I looked at the current PartitionAssignor's interface, we actually do
> > not have a way yet to instruct how to construct the next HB request, e.g.
> > when the assignor wants to enforce a new rebalance with a new assignment,
> > we'd need some customizable APIs inside the PartitionAssignor to indicate
> > the next HB telling broker about so. WDYT about adding such an API on the
> > PartitionAssignor?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
> > wrote:
> >
> > > Hi Jun,
> > >
> > > I have updated the KIP to include your feedback. I have also tried to
> > > clarify the parts which were not cleared.
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot 
> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for your feedback. Let me start by answering your questions
> > > > inline and I will update the KIP next week.
> > > >
> > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to
> be
> > > fewer
> > > > > RPCs during rebalance and more efficient support of wildcard. A few
> > > > > comments below.
> > > >
> > > > I would also add that the KIP removes the global sync barrier in the
> > > > protocol which is essential to improve group stability and
> > > > scalability, and the KIP also simplifies the client by moving most of
> > > > the logic to the server side.
> > > >
> > > > > 30. ConsumerGroupHeartbeatRequest
> > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> > > changing
> > > > > of the partition assignor in the consumers?
> > > >
> > > > Definitely. The group coordinator will use the assignor used by a
> > > > majority of the members. This allows the group to move from one
> > > > assignor to another by a roll. This is explained in the Assignor
> > > > Selection chapter.
> > > >
> > > > > 30.2 For each field, could you explain whether it's required in
> every
> > > > > request or the scenarios when it needs to be filled? For example,
> it's
> > > not
> > > > > clear to me when TopicPartitions needs to be filled.
> > > >
> > > > The client is expected to set those fields in case of a connection
> > > > issue (e.g. timeout) or when the fields have changed since the last
> > > > HB. The server populates those fields as long as the member is not
> > > > fully reconciled - the member should acknowledge that it has the
> > > > expected epoch and assignment. I will clarify this in the KIP.
> > > >
> > > > > 31. In the current consumer protocol, the rack affinity between the
> > > client
> > > > > and the broker is only considered during fetching, but not during
> > > assigning
> > > > > partitions to consumers. Sometimes, once the assignment is made,
> there
> > > is
> > > > > no opportunity for read affinity because no replicas of assigned
> > > partitions
> > > > > are close to the member. I am wondering if we should use this
> > > opportunity
> > > > > to address this by including rack in GroupMember.
> > > >
> > > > That's an interesting idea. I don't see any issue with adding the
> rack
> > > > to the members. I will do so.
> > > >
> > > > > 32. On the metric side, often, it's useful to know how busy a group
> > > > > coordinator is. By moving the event loop model, it seems that we
> could
> > > add
> > > > > a metric that tracks the fraction of the time the event loop is
> doing
> > > the
> > > > > actual work.
> > > >
> > > > That's a great idea. I will add it. Thanks.
> > > >
> > > > > 33. Could we add a section on coordinator failover handling? For
> > > example,
> > > > > does it need to trigger the check if any group with the wildcard
> > > > > subscription now has a new matching

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.2 #81

2022-09-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 303337 lines...]
[2022-09-09T19:03:08.156Z] 
[2022-09-09T19:03:08.157Z] 
org.apache.kafka.streams.integration.ResetPartitionTimeIntegrationTest > 
shouldPreservePartitionTimeOnKafkaStreamRestart[exactly_once_v2] PASSED
[2022-09-09T19:03:08.157Z] 
[2022-09-09T19:03:08.157Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[at_least_once] 
STARTED
[2022-09-09T19:04:07.106Z] 
[2022-09-09T19:04:07.106Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[at_least_once] 
PASSED
[2022-09-09T19:04:07.106Z] 
[2022-09-09T19:04:07.106Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[exactly_once] 
STARTED
[2022-09-09T19:05:09.017Z] 
[2022-09-09T19:05:09.017Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[exactly_once] 
PASSED
[2022-09-09T19:05:09.017Z] 
[2022-09-09T19:05:09.017Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[exactly_once_v2]
 STARTED
[2022-09-09T19:06:01.993Z] 
[2022-09-09T19:06:01.993Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[exactly_once_v2]
 PASSED
[2022-09-09T19:06:10.653Z] 
[2022-09-09T19:06:10.653Z] 
org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest > 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled STARTED
[2022-09-09T19:06:12.639Z] 
[2022-09-09T19:06:12.639Z] 
org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest > 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled PASSED
[2022-09-09T19:06:12.639Z] 
[2022-09-09T19:06:12.639Z] 
org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest > 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables STARTED
[2022-09-09T19:06:17.064Z] 
[2022-09-09T19:06:17.064Z] 
org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest > 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables PASSED
[2022-09-09T19:06:20.268Z] 
[2022-09-09T19:06:20.268Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testNotCleanUpStateDirIfNotEmpty STARTED
[2022-09-09T19:06:23.386Z] 
[2022-09-09T19:06:23.386Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testNotCleanUpStateDirIfNotEmpty PASSED
[2022-09-09T19:06:23.386Z] 
[2022-09-09T19:06:23.386Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testCleanUpStateDirIfEmpty STARTED
[2022-09-09T19:06:27.815Z] 
[2022-09-09T19:06:27.815Z] 
org.apache.kafka.streams.integration.StateDirectoryIntegrationTest > 
testCleanUpStateDirIfEmpty PASSED
[2022-09-09T19:06:29.851Z] 
[2022-09-09T19:06:29.851Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificActivePartitionStores STARTED
[2022-09-09T19:06:38.656Z] 
[2022-09-09T19:06:38.656Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificActivePartitionStores PASSED
[2022-09-09T19:06:38.656Z] 
[2022-09-09T19:06:38.656Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryAllStalePartitionStores STARTED
[2022-09-09T19:06:49.516Z] 
[2022-09-09T19:06:49.516Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryAllStalePartitionStores PASSED
[2022-09-09T19:06:49.516Z] 
[2022-09-09T19:06:49.516Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStoresMultiStreamThreads STARTED
[2022-09-09T19:06:57.066Z] 
[2022-09-09T19:06:57.066Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStoresMultiStreamThreads PASSED
[2022-09-09T19:06:57.066Z] 
[2022-09-09T19:06:57.066Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStores STARTED
[2022-09-09T19:07:05.785Z] 
[2022-09-09T19:07:05.785Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQuerySpecificStalePartitionStores PASSED
[2022-09-09T19:07:05.785Z] 
[2022-09-09T19:07:05.785Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryOnlyActivePartitionStoresByDefault STARTED
[2022-09-09T19:07:15.638Z] 
[2022-09-09T19:07:15.638Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryOnlyActivePartitionStoresByDefault PASSED
[2022-09-09T19:07:15.638Z] 
[2022-09-09T19:07:15.638Z] 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest > 
shouldQueryStoresAf

Re:[VOTE] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
+1 (non-binding)

Really looking forward for the discussion on how other group types (especially 
connect) will support this new protocol. 

From: dev@kafka.apache.org At: 09/09/22 04:32:46 UTC-4:00To:  
dev@kafka.apache.org
Subject: [VOTE] KIP-848: The Next Generation of the Consumer Rebalance Protocol

Hi all,

Thank you all for the very positive discussion about KIP-848. It looks
like folks are very positive about it overall.

I would like to start a vote on KIP-848, which introduces a brand new
consumer rebalance protocol.

The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.

Best,
David




Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
So it seems there's a consensus on having dedicated APIs for Connect, which 
means having data model (group, member, assignment) and APIs (heartbeat 
request/response, assignment prepare and install) tailored specifically to 
connect. I wonder if adding support for other coordinator group types (e.g. 
leadership, in the case of schema registry) will require similar assets (new 
model classes to express members and resources, custom heartbeats and 
assignment prepare/install APIs).

I think that, as new use cases are considered, the core primitives of the new 
protocol will be generalized, so new types don't have to implement the whole 
stack (e.g. state machines), but only functions like detecting group metadata 
changes, or computing assignments of the resources handled by each type 
(Topic/Partitions in the case of consumer, Connector/Task in the case of 
Connect, Leadership in the case of Schema Registry, and so on).


From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
Protocol

Thank you Guozhang/David for the feedback. Looks like there's agreement on
using separate APIs for Connect. I would revisit the doc and see what
changes are to be made.

Thanks!
Sagar.

On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
wrote:

> Hi Sagar,
>
> Thanks for the feedback and the document. That's really helpful. I
> will take a look at it.
>
> Overall, it seems to me that both Connect and the Consumer could share
> the same underlying "engine". The main difference is that the Consumer
> assigns topic-partitions to members whereas Connect assigns tasks to
> workers. I see two ways to move forward:
> 1) We extend the new proposed APIs to support different resource types
> (e.g. partitions, tasks, etc.); or
> 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> similar to the new ones but different on the content/resources and
> they would rely on the same engine on the coordinator side.
>
> I personally lean towards 2) because I am not a fan of overcharging
> APIs to serve different purposes. That being said, I am not opposed to
> 1) if we can find an elegant way to do it.
>
> I think that we can continue to discuss it here for now in order to
> ensure that this KIP is compatible with what we will do for Connect in
> the future.
>
> Best,
> David
>
> On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> >
> > Hi all,
> >
> > I am back from vacation. I will go through and address your comments
> > in the coming days. Thanks for your feedback.
> >
> > Cheers,
> > David
> >
> > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> wrote:
> > >
> > > Hey All!
> > >
> > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> making it
> > > down the stack!
> > >
> > > I had a few questions:
> > >
> > > 1. The 'Rejected Alternatives' section describes how member epoch
> should
> > > advance in step with the group epoch and assignment epoch values. I
> think
> > > that this is a good idea for the reasons described in the KIP. When the
> > > protocol is incrementally assigning partitions to a worker, what member
> > > epoch does each incremental assignment use? Are member epochs re-used,
> and
> > > a single member epoch can correspond to multiple different
> (monotonically
> > > larger) assignments?
> > >
> > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > > not, should custom client-side assignor implementations interact with
> the
> > > Reason field, and how is its common meaning agreed upon? If so, what
> is the
> > > benefit of a distinct Reason field over including such functionality
> in the
> > > opaque metadata?
> > >
> > > 3. The following is included in the KIP: "Thanks to this, the input of
> the
> > > client side assignor is entirely driven by the group coordinator. The
> > > consumer is no longer responsible for maintaining any state besides its
> > > assigned partitions." Does this mean that the client-side assignor MAY
> > > incorporate additional non-Metadata state (such as partition
> throughput,
> > > cpu/memory metrics, config topics, etc), or that additional
> non-Metadata
> > > state SHOULD NOT be used?
> > >
> > > 4. I see that there are separate classes
> > > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > > overlap significantly. Is it possible for these two implementations to
> be
> > > unified? This would serve to promote feature parity of server-side and
> > > client-side assignors, and would also facilitate operational
> flexibility in
> > > certain situations. For example, if a server-side assignor has some
> poor
> > > behavior and needs a patch, deploying the patched assignor to the
> client
> > > and switching one consumer group to a client-side assignor may be
> faster
> > > and less risky than patching all of the brokers. With the currently
>

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.3 #65

2022-09-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 573394 lines...]
[2022-09-09T18:12:35.520Z] 
[2022-09-09T18:12:35.520Z] Exception: java.lang.AssertionError thrown from the 
UncaughtExceptionHandler in thread 
"appId_StreamsUncaughtExceptionHandlerIntegrationTestnull-d04ffea1-d494-4c5b-a2d9-3b2d2bd0d192-GlobalStreamThread"
[2022-09-09T18:12:35.520Z] 
[2022-09-09T18:12:35.520Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread PASSED
[2022-09-09T18:12:35.520Z] 
[2022-09-09T18:12:35.520Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldReplaceSingleThread STARTED
[2022-09-09T18:12:37.571Z] 
[2022-09-09T18:12:37.571Z] Exception: java.lang.AssertionError thrown from the 
UncaughtExceptionHandler in thread 
"appId_StreamsUncaughtExceptionHandlerIntegrationTestnull-0d1d1f83-8ae8-42be-ace9-00bcee605959-StreamThread-1"
[2022-09-09T18:12:37.571Z] 
[2022-09-09T18:12:37.571Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldReplaceSingleThread PASSED
[2022-09-09T18:12:37.571Z] 
[2022-09-09T18:12:37.571Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutdownMultipleThreadApplication STARTED
[2022-09-09T18:12:46.511Z] 
[2022-09-09T18:12:46.511Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutdownMultipleThreadApplication PASSED
[2022-09-09T18:12:46.511Z] 
[2022-09-09T18:12:46.511Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutdownClient STARTED
[2022-09-09T18:12:48.498Z] 
[2022-09-09T18:12:48.498Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutdownClient PASSED
[2022-09-09T18:12:48.498Z] 
[2022-09-09T18:12:48.498Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutdownSingleThreadApplication STARTED
[2022-09-09T18:12:56.794Z] 
[2022-09-09T18:12:56.794Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldShutdownSingleThreadApplication PASSED
[2022-09-09T18:12:56.794Z] 
[2022-09-09T18:12:56.794Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldEmitSameRecordAfterFailover STARTED
[2022-09-09T18:13:00.990Z] 
[2022-09-09T18:13:00.990Z] 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
 > shouldEmitSameRecordAfterFailover PASSED
[2022-09-09T18:13:03.040Z] 
[2022-09-09T18:13:03.040Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[at_least_once] STARTED
[2022-09-09T18:13:56.067Z] 
[2022-09-09T18:13:56.067Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[at_least_once] PASSED
[2022-09-09T18:13:56.067Z] 
[2022-09-09T18:13:56.067Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[exactly_once] STARTED
[2022-09-09T18:14:49.364Z] 
[2022-09-09T18:14:49.364Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[exactly_once] PASSED
[2022-09-09T18:14:49.364Z] 
[2022-09-09T18:14:49.364Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[exactly_once_v2] STARTED
[2022-09-09T18:15:43.145Z] 
[2022-09-09T18:15:43.145Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[exactly_once_v2] PASSED
[2022-09-09T18:15:43.145Z] 
[2022-09-09T18:15:43.145Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] STARTED
[2022-09-09T18:15:45.268Z] 
[2022-09-09T18:15:45.268Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] PASSED
[2022-09-09T18:15:45.268Z] 
[2022-09-09T18:15:45.268Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] STARTED
[2022-09-09T18:15:52.772Z] 
[2022-09-09T18:15:52.772Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] PASSED
[2022-09-09T18:15:52.772Z] 
[2022-09-09T18:15:52.772Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] STARTED
[2022-09-09T18:16:00.272Z] 
[2022-09-09T18:16:00.272Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] PASSED
[2022-09-09T18:16:00.272Z] 
[2022-09-09T18:16:00.272Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTes

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1214

2022-09-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 510511 lines...]
[2022-09-09T17:42:12.344Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient PASSED
[2022-09-09T17:42:12.344Z] 
[2022-09-09T17:42:12.344Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount STARTED
[2022-09-09T17:42:58.637Z] 
[2022-09-09T17:42:58.637Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount PASSED
[2022-09-09T17:42:58.637Z] 
[2022-09-09T17:42:58.637Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargePartitionCount STARTED
[2022-09-09T17:43:37.222Z] 
[2022-09-09T17:43:37.222Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargePartitionCount PASSED
[2022-09-09T17:43:37.222Z] 
[2022-09-09T17:43:37.222Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyStandbys STARTED
[2022-09-09T17:43:44.129Z] 
[2022-09-09T17:43:44.129Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyStandbys PASSED
[2022-09-09T17:43:44.129Z] 
[2022-09-09T17:43:44.129Z] StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyStandbys STARTED
[2022-09-09T17:44:28.657Z] 
[2022-09-09T17:44:28.657Z] StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyStandbys PASSED
[2022-09-09T17:44:28.657Z] 
[2022-09-09T17:44:28.657Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargeNumConsumers STARTED
[2022-09-09T17:44:28.657Z] 
[2022-09-09T17:44:28.657Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargeNumConsumers PASSED
[2022-09-09T17:44:28.657Z] 
[2022-09-09T17:44:28.657Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargeNumConsumers STARTED
[2022-09-09T17:44:31.120Z] 
[2022-09-09T17:44:31.120Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargeNumConsumers PASSED
[2022-09-09T17:44:32.163Z] 
[2022-09-09T17:44:32.163Z] AdjustStreamThreadCountTest > 
testConcurrentlyAccessThreads() STARTED
[2022-09-09T17:44:35.291Z] 
[2022-09-09T17:44:35.291Z] AdjustStreamThreadCountTest > 
testConcurrentlyAccessThreads() PASSED
[2022-09-09T17:44:35.291Z] 
[2022-09-09T17:44:35.291Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadReplacement() STARTED
[2022-09-09T17:44:44.034Z] 
[2022-09-09T17:44:44.034Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadReplacement() PASSED
[2022-09-09T17:44:44.034Z] 
[2022-09-09T17:44:44.034Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveThreadsMultipleTimes() STARTED
[2022-09-09T17:44:48.547Z] 
[2022-09-09T17:44:48.547Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveThreadsMultipleTimes() PASSED
[2022-09-09T17:44:48.547Z] 
[2022-09-09T17:44:48.547Z] AdjustStreamThreadCountTest > 
shouldnNotRemoveStreamThreadWithinTimeout() STARTED
[2022-09-09T17:44:55.498Z] 
[2022-09-09T17:44:55.498Z] AdjustStreamThreadCountTest > 
shouldnNotRemoveStreamThreadWithinTimeout() PASSED
[2022-09-09T17:44:55.498Z] 
[2022-09-09T17:44:55.498Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() STARTED
[2022-09-09T17:45:19.198Z] 
[2022-09-09T17:45:19.198Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() PASSED
[2022-09-09T17:45:19.198Z] 
[2022-09-09T17:45:19.198Z] AdjustStreamThreadCountTest > 
shouldAddStreamThread() STARTED
[2022-09-09T17:45:26.303Z] 
[2022-09-09T17:45:26.303Z] AdjustStreamThreadCountTest > 
shouldAddStreamThread() PASSED
[2022-09-09T17:45:26.303Z] 
[2022-09-09T17:45:26.303Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThreadWithStaticMembership() STARTED
[2022-09-09T17:45:30.464Z] 
[2022-09-09T17:45:30.464Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThreadWithStaticMembership() PASSED
[2022-09-09T17:45:30.464Z] 
[2022-09-09T17:45:30.464Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThread() STARTED
[2022-09-09T17:45:37.414Z] 
[2022-09-09T17:45:37.414Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThread() PASSED
[2022-09-09T17:45:37.414Z] 
[2022-09-09T17:45:37.414Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadRemovalTimesOut() STARTED
[2022-09-09T17:45:39.698Z] 
[2022-09-09T17:45:39.698Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadRemovalTimesOut() PASSED
[2022-09-09T17:45:46.183Z] 
[2022-09-09T17:45:46.183Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest()
 STARTED
[2022-09-09T17:45:46.183Z] 
[2022-09-09T17:45:46.183Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest()
 PASSED
[2022-09-09T17:45:46.183Z] 
[2022-09-09T17:45:46.183Z] FineGrainedAutoResetIntegrationTest > 
shouldThrowExceptionOverlappingPattern() STARTED
[2022-09-09T17:45:46.183Z] 
[2022-09-09T17:45:46.183Z] FineGrainedAutoResetIntegrationTest > 
shouldThrowExceptionOverla

Re: [DISCUSSION] KIP-864: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Николай Ижиков
Hello, Bill

Please, cast your vote in VOTE thread - 
https://lists.apache.org/thread/hct5r7lz1n5mnwol56jhf5tlg2qykv5f 



> 9 сент. 2022 г., в 20:33, Bill Bejeck  написал(а):
> 
> Hi Николай,
> 
> Thanks for the KIP; consistency across the tooling is important, so I'm
> a +1 for this KIP.
> I agree with Guozhang on voting right away.
> 
> -Bill
> 
> On Fri, Sep 9, 2022 at 12:05 PM Guozhang Wang  wrote:
> 
>> Hi, I think we can vote right away.
>> 
>> Guozhang
>> 
>> On Fri, Sep 9, 2022 at 2:24 AM Николай Ижиков  wrote:
>> 
>>> Hello, Guozhang.
>>> 
>>> Thanks for the feedback.
>>> As this KIP very straightforward, is it worth to be voted right now?
>>> Or should we wait more feedback?
>>> 
 9 сент. 2022 г., в 08:11, Guozhang Wang 
>> написал(а):
 
 Hello Николай,
 
 Thanks for writing the KIP, I think it's rather straightforward and
>>> better
 to be consistent in tooling params. I'm +1.
 
 Guozhang
 
 
 On Mon, Sep 5, 2022 at 11:25 PM Николай Ижиков 
>>> wrote:
 
> Hello.
> 
> Do we still want to make parameter names consistent in tools?
> If yes, please, share your feedback on KIP.
> 
>> 31 авг. 2022 г., в 11:50, Николай Ижиков 
> написал(а):
>> 
>> Hello.
>> 
>> I would like to start discussion on small KIP [1]
>> The goal of KIP is to add the same —boostrap-server parameter to
> `kafka-streams-appliation-reset.sh` tool as other tools use.
>> Please, share your feedback.
>> 
>> [1]
> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
>> 
> 
> 
 
 --
 -- Guozhang
>>> 
>>> 
>> 
>> --
>> -- Guozhang
>> 



Re: [DISCUSSION] KIP-864: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Bill Bejeck
Hi Николай,

Thanks for the KIP; consistency across the tooling is important, so I'm
a +1 for this KIP.
I agree with Guozhang on voting right away.

-Bill

On Fri, Sep 9, 2022 at 12:05 PM Guozhang Wang  wrote:

> Hi, I think we can vote right away.
>
> Guozhang
>
> On Fri, Sep 9, 2022 at 2:24 AM Николай Ижиков  wrote:
>
> > Hello, Guozhang.
> >
> > Thanks for the feedback.
> > As this KIP very straightforward, is it worth to be voted right now?
> > Or should we wait more feedback?
> >
> > > 9 сент. 2022 г., в 08:11, Guozhang Wang 
> написал(а):
> > >
> > > Hello Николай,
> > >
> > > Thanks for writing the KIP, I think it's rather straightforward and
> > better
> > > to be consistent in tooling params. I'm +1.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Sep 5, 2022 at 11:25 PM Николай Ижиков 
> > wrote:
> > >
> > >> Hello.
> > >>
> > >> Do we still want to make parameter names consistent in tools?
> > >> If yes, please, share your feedback on KIP.
> > >>
> > >>> 31 авг. 2022 г., в 11:50, Николай Ижиков 
> > >> написал(а):
> > >>>
> > >>> Hello.
> > >>>
> > >>> I would like to start discussion on small KIP [1]
> > >>> The goal of KIP is to add the same —boostrap-server parameter to
> > >> `kafka-streams-appliation-reset.sh` tool as other tools use.
> > >>> Please, share your feedback.
> > >>>
> > >>> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> > >>>
> > >>
> > >>
> > >
> > > --
> > > -- Guozhang
> >
> >
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-865 Metadata Transactions

2022-09-09 Thread David Arthur
Starting a new thread here
https://lists.apache.org/thread/895pgb85l08g2l63k99cw5dt2qpjkxb9

On Fri, Sep 9, 2022 at 1:05 PM Colin McCabe  wrote:
>
> Also, it looks like someone already claimed KIP-865, so I'd suggest grabbing 
> a new number. :)
>
> Colin
>
>
> On Fri, Sep 9, 2022, at 09:38, Colin McCabe wrote:
> > Thanks for this KIP, David!
> >
> > In the "motivation" section, it might help to give a concrete example
> > of an operation we want to be atomic. My favorite one is probably
> > CreateTopics since it's easy to see that we want to create all of a
> > topic or none of it, and a topic could be a potentially unbounded
> > number of records (although hopefully people have reasonable create
> > topic policy classes in place...)
> >
> > In "broker support", it would be good to clarify that we will buffer
> > the records in the MetadataDelta and not publish a new MetadataImage
> > until the transaction is over. This is an implementation detail, but
> > it's a simple one and I think it will make it easier to understand how
> > this works.
> >
> > In the "Raft Transactions" section of "Rejected Alternatives," I'd add
> > that managing buffering in the Raft layer would be a lot less efficient
> > than doing it in the controller / broker layer. We would end up
> > accumulating big lists of records which would then have to be applied
> > when the transaction completed, rather than building up a MetadataDelta
> > (or updating the controller state) incrementally.
> >
> > Maybe we want to introduce the concept of "last stable offset" to be
> > the last committed offset that is NOT part of an ongoing transaction?
> > Just a nomenclature suggestion...
> >
> > best,
> > Colin
> >
> > On Fri, Sep 9, 2022, at 06:42, David Arthur wrote:
> >> Hey folks, I'd like to start a discussion on the idea of adding
> >> transactions in the KRaft controller. This will allow us to overcome
> >> the current limitation of atomic batch sizes in Raft which lets us do
> >> things like create topics with a huge number of partitions.
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-865+Metadata+Transactions
> >>
> >> Thanks!
> >> David



-- 
David Arthur


[DISCUSS] KIP-868 Metadata Transactions (new thread)

2022-09-09 Thread David Arthur
Starting a new thread to avoid issues with mail client threading.

Original thread follows:

Hey folks, I'd like to start a discussion on the idea of adding
transactions in the KRaft controller. This will allow us to overcome
the current limitation of atomic batch sizes in Raft which lets us do
things like create topics with a huge number of partitions.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-868+Metadata+Transactions

Thanks!

---

Colin McCabe said:

Thanks for this KIP, David!

In the "motivation" section, it might help to give a concrete example
of an operation we want to be atomic. My favorite one is probably
CreateTopics since it's easy to see that we want to create all of a
topic or none of it, and a topic could be a potentially unbounded
number of records (although hopefully people have reasonable create
topic policy classes in place...)

In "broker support", it would be good to clarify that we will buffer
the records in the MetadataDelta and not publish a new MetadataImage
until the transaction is over. This is an implementation detail, but
it's a simple one and I think it will make it easier to understand how
this works.

In the "Raft Transactions" section of "Rejected Alternatives," I'd add
that managing buffering in the Raft layer would be a lot less
efficient than doing it in the controller / broker layer. We would end
up accumulating big lists of records which would then have to be
applied when the transaction completed, rather than building up a
MetadataDelta (or updating the controller state) incrementally.

Maybe we want to introduce the concept of "last stable offset" to be
the last committed offset that is NOT part of an ongoing transaction?
Just a nomenclature suggestion...

best,
Colin


Re: [DISCUSS] KIP-865 Metadata Transactions

2022-09-09 Thread Colin McCabe
Also, it looks like someone already claimed KIP-865, so I'd suggest grabbing a 
new number. :)

Colin


On Fri, Sep 9, 2022, at 09:38, Colin McCabe wrote:
> Thanks for this KIP, David!
>
> In the "motivation" section, it might help to give a concrete example 
> of an operation we want to be atomic. My favorite one is probably 
> CreateTopics since it's easy to see that we want to create all of a 
> topic or none of it, and a topic could be a potentially unbounded 
> number of records (although hopefully people have reasonable create 
> topic policy classes in place...)
>
> In "broker support", it would be good to clarify that we will buffer 
> the records in the MetadataDelta and not publish a new MetadataImage 
> until the transaction is over. This is an implementation detail, but 
> it's a simple one and I think it will make it easier to understand how 
> this works.
>
> In the "Raft Transactions" section of "Rejected Alternatives," I'd add 
> that managing buffering in the Raft layer would be a lot less efficient 
> than doing it in the controller / broker layer. We would end up 
> accumulating big lists of records which would then have to be applied 
> when the transaction completed, rather than building up a MetadataDelta 
> (or updating the controller state) incrementally.
>
> Maybe we want to introduce the concept of "last stable offset" to be 
> the last committed offset that is NOT part of an ongoing transaction? 
> Just a nomenclature suggestion...
>
> best,
> Colin
>
> On Fri, Sep 9, 2022, at 06:42, David Arthur wrote:
>> Hey folks, I'd like to start a discussion on the idea of adding
>> transactions in the KRaft controller. This will allow us to overcome
>> the current limitation of atomic batch sizes in Raft which lets us do
>> things like create topics with a huge number of partitions.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-865+Metadata+Transactions
>>
>> Thanks!
>> David


Re: [VOTE] KIP-865: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Guozhang Wang
+1. Thanks.

Guozhang

On Fri, Sep 9, 2022 at 9:52 AM Николай Ижиков  wrote:

> Hello.
>
> I'd like to start a vote on KIP-865 which adds support of
> —bootstrap-server parameter in kafka-streams-application-reset tool
>
> Discuss Thread:
> https://lists.apache.org/thread/5c1plw7mgmzd4zzqh1w59cqopn8kv21c
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> JIRA: https://issues.apache.org/jira/browse/KAFKA-12878



-- 
-- Guozhang


Re: [DISCUSS] KIP-865 Metadata Transactions

2022-09-09 Thread Николай Ижиков
Hello.

Sorry, but it seems we should fix KIP numbering.
KIP-865 [1] is another KIP :).

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset

> 9 сент. 2022 г., в 19:38, Colin McCabe  написал(а):
> 
> Thanks for this KIP, David!
> 
> In the "motivation" section, it might help to give a concrete example of an 
> operation we want to be atomic. My favorite one is probably CreateTopics 
> since it's easy to see that we want to create all of a topic or none of it, 
> and a topic could be a potentially unbounded number of records (although 
> hopefully people have reasonable create topic policy classes in place...)
> 
> In "broker support", it would be good to clarify that we will buffer the 
> records in the MetadataDelta and not publish a new MetadataImage until the 
> transaction is over. This is an implementation detail, but it's a simple one 
> and I think it will make it easier to understand how this works.
> 
> In the "Raft Transactions" section of "Rejected Alternatives," I'd add that 
> managing buffering in the Raft layer would be a lot less efficient than doing 
> it in the controller / broker layer. We would end up accumulating big lists 
> of records which would then have to be applied when the transaction 
> completed, rather than building up a MetadataDelta (or updating the 
> controller state) incrementally.
> 
> Maybe we want to introduce the concept of "last stable offset" to be the last 
> committed offset that is NOT part of an ongoing transaction? Just a 
> nomenclature suggestion...
> 
> best,
> Colin
> 
> On Fri, Sep 9, 2022, at 06:42, David Arthur wrote:
>> Hey folks, I'd like to start a discussion on the idea of adding
>> transactions in the KRaft controller. This will allow us to overcome
>> the current limitation of atomic batch sizes in Raft which lets us do
>> things like create topics with a huge number of partitions.
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-865+Metadata+Transactions
>> 
>> Thanks!
>> David



[VOTE] KIP-865: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Николай Ижиков
Hello.

I'd like to start a vote on KIP-865 which adds support of —bootstrap-server 
parameter in kafka-streams-application-reset tool

Discuss Thread: https://lists.apache.org/thread/5c1plw7mgmzd4zzqh1w59cqopn8kv21c
KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
JIRA: https://issues.apache.org/jira/browse/KAFKA-12878

Re: Problem with Kafka KRaft 3.1.X

2022-09-09 Thread Colin McCabe
Hi Paul,

As Keith wrote, it does sound like you are hitting a separate Linux limit like 
the max mmap count.

I'm curious how many partitions you can create if you change that config!

best,
Colin


On Tue, Sep 6, 2022, at 14:02, Keith Paulson wrote:
> I've had similar errors cause by mmap counts; try with
> vm.max_map_count=262144
>
>
> On 2022/09/01 23:57:54 Paul Brebner wrote:
>> Hi all,
>>
>> I've been attempting to benchmark Kafka KRaft version for an ApacheCon
> talk
>> and have identified 2 problems:
>>
>> 1 - it's still impossible to create large number of partitions/topics - I
>> can create more than the comparable Zookeeper version but still not
>> "millions" - this is with RF=1 (as anything higher needs huge clusters to
>> cope with the replication CPU overhead) only, and no load on the clusters
>> yet (i.e. purely a topic/partition creation experiment).
>>
>> 2 - eventually the topic/partition creation command causes the Kafka
>> process to fail - looks like a memory error -
>>
>> java.lang.OutOfMemoryError: Metaspace
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x7f4f554f9000, 65536, 1) failed; error='Not enough
>> space' (errno=12)
>>
>> or similar error
>>
>> seems to happen consistently around 30,000+ partitions - this is on a test
>> EC2 instance with 32GB Ram, 500,000 file descriptors (increased from
>> default) and 64GB disk (plenty spare). I'm not an OS expert, but the kafka
>> process and the OS both seem to have plenty of RAM when this error occurs.
>>
>> So there's 3 questions really: What's going wrong exactly? How to achieve
>> more partitions? And should the topic create command (just using the CLI
> at
>> present to create topics) really be capable of killing the Kafka instance,
>> or should it fail and throw an error, and the Kafka instance still
> continue
>> working...
>>
>> Regards, Paul Brebner
>>


Re: [DISCUSS] KIP-865 Metadata Transactions

2022-09-09 Thread Colin McCabe
Thanks for this KIP, David!

In the "motivation" section, it might help to give a concrete example of an 
operation we want to be atomic. My favorite one is probably CreateTopics since 
it's easy to see that we want to create all of a topic or none of it, and a 
topic could be a potentially unbounded number of records (although hopefully 
people have reasonable create topic policy classes in place...)

In "broker support", it would be good to clarify that we will buffer the 
records in the MetadataDelta and not publish a new MetadataImage until the 
transaction is over. This is an implementation detail, but it's a simple one 
and I think it will make it easier to understand how this works.

In the "Raft Transactions" section of "Rejected Alternatives," I'd add that 
managing buffering in the Raft layer would be a lot less efficient than doing 
it in the controller / broker layer. We would end up accumulating big lists of 
records which would then have to be applied when the transaction completed, 
rather than building up a MetadataDelta (or updating the controller state) 
incrementally.

Maybe we want to introduce the concept of "last stable offset" to be the last 
committed offset that is NOT part of an ongoing transaction? Just a 
nomenclature suggestion...

best,
Colin

On Fri, Sep 9, 2022, at 06:42, David Arthur wrote:
> Hey folks, I'd like to start a discussion on the idea of adding
> transactions in the KRaft controller. This will allow us to overcome
> the current limitation of atomic batch sizes in Raft which lets us do
> things like create topics with a huge number of partitions.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-865+Metadata+Transactions
>
> Thanks!
> David


Re: [DISCUSSION] KIP-864: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Guozhang Wang
Hi, I think we can vote right away.

Guozhang

On Fri, Sep 9, 2022 at 2:24 AM Николай Ижиков  wrote:

> Hello, Guozhang.
>
> Thanks for the feedback.
> As this KIP very straightforward, is it worth to be voted right now?
> Or should we wait more feedback?
>
> > 9 сент. 2022 г., в 08:11, Guozhang Wang  написал(а):
> >
> > Hello Николай,
> >
> > Thanks for writing the KIP, I think it's rather straightforward and
> better
> > to be consistent in tooling params. I'm +1.
> >
> > Guozhang
> >
> >
> > On Mon, Sep 5, 2022 at 11:25 PM Николай Ижиков 
> wrote:
> >
> >> Hello.
> >>
> >> Do we still want to make parameter names consistent in tools?
> >> If yes, please, share your feedback on KIP.
> >>
> >>> 31 авг. 2022 г., в 11:50, Николай Ижиков 
> >> написал(а):
> >>>
> >>> Hello.
> >>>
> >>> I would like to start discussion on small KIP [1]
> >>> The goal of KIP is to add the same —boostrap-server parameter to
> >> `kafka-streams-appliation-reset.sh` tool as other tools use.
> >>> Please, share your feedback.
> >>>
> >>> [1]
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> >>>
> >>
> >>
> >
> > --
> > -- Guozhang
>
>

-- 
-- Guozhang


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.1 #133

2022-09-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 371399 lines...]
[2022-09-09T14:55:06.255Z] > Task 
:clients:generatePomFileForMavenJavaPublication
[2022-09-09T14:55:06.255Z] 
[2022-09-09T14:55:06.255Z] > Task :streams:processMessages
[2022-09-09T14:55:06.255Z] Execution optimizations have been disabled for task 
':streams:processMessages' to ensure correctness due to the following reasons:
[2022-09-09T14:55:06.255Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/streams/src/generated/java/org/apache/kafka/streams/internals/generated'.
 Reason: Task ':streams:srcJar' uses this output of task 
':streams:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.2/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2022-09-09T14:55:06.255Z] MessageGenerator: processed 1 Kafka message JSON 
files(s).
[2022-09-09T14:55:06.255Z] 
[2022-09-09T14:55:06.255Z] > Task :streams:compileJava UP-TO-DATE
[2022-09-09T14:55:06.255Z] > Task :streams:classes UP-TO-DATE
[2022-09-09T14:55:06.255Z] > Task :streams:test-utils:compileJava UP-TO-DATE
[2022-09-09T14:55:07.499Z] > Task :streams:copyDependantLibs
[2022-09-09T14:55:07.499Z] > Task :streams:jar UP-TO-DATE
[2022-09-09T14:55:07.499Z] > Task 
:streams:generateMetadataFileForMavenJavaPublication
[2022-09-09T14:55:10.729Z] > Task :connect:api:javadoc
[2022-09-09T14:55:10.729Z] > Task :connect:api:copyDependantLibs UP-TO-DATE
[2022-09-09T14:55:10.729Z] > Task :connect:api:jar UP-TO-DATE
[2022-09-09T14:55:10.729Z] > Task 
:connect:api:generateMetadataFileForMavenJavaPublication
[2022-09-09T14:55:10.729Z] > Task :connect:json:copyDependantLibs UP-TO-DATE
[2022-09-09T14:55:10.729Z] > Task :connect:json:jar UP-TO-DATE
[2022-09-09T14:55:10.729Z] > Task 
:connect:json:generateMetadataFileForMavenJavaPublication
[2022-09-09T14:55:10.729Z] > Task :connect:api:javadocJar
[2022-09-09T14:55:10.729Z] > Task 
:connect:json:publishMavenJavaPublicationToMavenLocal
[2022-09-09T14:55:10.729Z] > Task :connect:json:publishToMavenLocal
[2022-09-09T14:55:10.729Z] > Task :connect:api:compileTestJava UP-TO-DATE
[2022-09-09T14:55:10.730Z] > Task :connect:api:testClasses UP-TO-DATE
[2022-09-09T14:55:10.730Z] > Task :connect:api:testJar
[2022-09-09T14:55:10.730Z] > Task :connect:api:testSrcJar
[2022-09-09T14:55:10.730Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2022-09-09T14:55:10.730Z] > Task :connect:api:publishToMavenLocal
[2022-09-09T14:55:15.079Z] > Task :streams:javadoc
[2022-09-09T14:55:16.320Z] > Task :streams:javadocJar
[2022-09-09T14:55:16.320Z] 
[2022-09-09T14:55:16.320Z] > Task :clients:javadoc
[2022-09-09T14:55:16.320Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java:147:
 warning - Tag @link: reference not found: 
[2022-09-09T14:55:17.282Z] 1 warning
[2022-09-09T14:55:18.573Z] 
[2022-09-09T14:55:18.573Z] > Task :clients:javadocJar
[2022-09-09T14:55:19.535Z] 
[2022-09-09T14:55:19.535Z] > Task :clients:srcJar
[2022-09-09T14:55:19.535Z] Execution optimizations have been disabled for task 
':clients:srcJar' to ensure correctness due to the following reasons:
[2022-09-09T14:55:19.535Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/clients/src/generated/java'.
 Reason: Task ':clients:srcJar' uses this output of task 
':clients:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.2/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2022-09-09T14:55:20.672Z] 
[2022-09-09T14:55:20.672Z] > Task :clients:testJar
[2022-09-09T14:55:21.735Z] > Task :clients:testSrcJar
[2022-09-09T14:55:21.735Z] > Task 
:clients:publishMavenJavaPublicationToMavenLocal
[2022-09-09T14:55:21.735Z] > Task :clients:publishToMavenLocal
[2022-09-09T14:55:36.177Z] > Task :core:compileScala
[2022-09-09T14:57:46.587Z] > Task :core:classes
[2022-09-09T14:57:46.587Z] > Task :core:compileTestJava NO-SOURCE
[2022-09-09T14:58:03.701Z] > Task :core:compileTestScala
[2022-09-09T14:59:58.153Z] > Task :core:testClasses
[2022-09-09T15:00:12.668Z] > Task :streams:compileTestJava
[2022-09-09T15:00:12.669Z] > Task :streams:testClasses
[2022-09-09T15:00:12.669Z] > Task :streams:testJar
[2022-09-09T15:00:12.669Z] > Task :streams:testSrcJar
[2022-09-09T15:00:12.669Z] > Task 
:streams:publishMavenJavaPublicationToMavenLocal
[2022-09-09T15:00:12.669Z] > Task :streams:publis

[jira] [Created] (KAFKA-14213) Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread

2022-09-09 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14213:
--

 Summary: Reduce lock contention between 
control-plane-kafka-request-handler and kafka-log-cleaner-thread
 Key: KAFKA-14213
 URL: https://issues.apache.org/jira/browse/KAFKA-14213
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


We found that the StopReplica request's local processing time may be quite long 
due to the reasons explained below. The impact of a time-consuming StopReplica 
request is that all subsequent requests from the controller are blocked, 
causing slow convergence on the metadata plane.

 

The long local processing time is because the 
control-plane-kafka-request-handler thread is blocked on a lock to abort 
logCleaning with the following stack trace:

 
{code:java}
"control-plane-kafka-request-handler-0"
java.lang.Thread.State: WAITING at 
java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at 
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
 at 
java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
 at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at 
kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at 
kafka.log.LogManager.asyncDelete(LogManager.scala:1039)
{code}
 

 

In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
figuring out the filthiest compacted log, which can be a very time consuming 
task. Our periodic thread dumps captured many snapshots with the 
kafka-log-cleaner-thread in either of the following 2 stack traces:

 

 
{code:java}
"kafka-log-cleaner-thread-0"
java.lang.Thread.State: RUNNABLE at 
kafka.log.TimeIndex.(TimeIndex.scala:57) at 
kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at 
kafka.log.LazyIndex$$$Lambda$1871/0x000800f4d840.apply(Unknown Source) at 
kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at 
kafka.log.LazyIndex.get(LazyIndex.scala:60) at 
kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at 
kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at 
kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at 
kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
 at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
 at kafka.log.LogCleanerManager$$$Lambda$2215/0x00080104d040.apply(Unknown 
Source) at scala.collection.Iterator.find(Iterator.scala:993) at 
scala.collection.Iterator.find$(Iterator.scala:990) at 
scala.collection.AbstractIterator.find(Iterator.scala:1429) at 
scala.collection.IterableLike.find(IterableLike.scala:81) at 
scala.collection.IterableLike.find$(IterableLike.scala:80) at 
scala.collection.AbstractIterable.find(Iterable.scala:56) at 
kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
 at kafka.log.LogCleanerManager$$Lambda$2212/0x00080104f040.apply(Unknown 
Source) at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
scala.collection.TraversableLike$$Lambda$891/0x000800a89840.apply(Unknown 
Source) at scala.collection.immutable.List.foreach(List.scala:392)
{code}
 

 

 
{code:java}
"kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
 at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) 
at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
 at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) 
at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at 
org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
 at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
 at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
 at 
o

[jira] [Created] (KAFKA-14212) Fetch error response when hitting public OAuth/OIDC provider

2022-09-09 Thread Sushant Mahajan (Jira)
Sushant Mahajan created KAFKA-14212:
---

 Summary: Fetch error response when hitting public OAuth/OIDC 
provider
 Key: KAFKA-14212
 URL: https://issues.apache.org/jira/browse/KAFKA-14212
 Project: Kafka
  Issue Type: Improvement
Reporter: Sushant Mahajan


The class 
org.apache.kafka.common.security.oauthbearer.secured.HttpAccessTokenRetriever 
is used to send client creds to public OAuth/OIDC provider and fetch the 
response, possibly including the access token.

However, if there is an error - the exact error message from the provider is 
not currently being retrieved.

The error message can help the client easily diagnose if failure to fetch token 
is due to some misconfiguration on their side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Transactions, delivery timeout and changing transactional producer behavior

2022-09-09 Thread Dániel Urbán
Hi all,

I would like to bump this and bring some attention to the issue.
This is a nasty bug in the transactional producer, would be nice if I could
get some feedback on the PR: https://github.com/apache/kafka/pull/12392

Thanks in advance,
Daniel

Viktor Somogyi-Vass  ezt írta
(időpont: 2022. júl. 25., H, 15:28):

> Hi Luke & Artem,
>
> We prepared the fix, would you please help in getting a committer-reviewer
> to get this issue resolved?
>
> Thanks,
> Viktor
>
> On Fri, Jul 8, 2022 at 12:57 PM Dániel Urbán 
> wrote:
>
> > Submitted a PR with the fix: https://github.com/apache/kafka/pull/12392
> > In the PR I tried keeping the producer in a usable state after the forced
> > bump. I understand that it might be the cleanest solution, but the only
> > other option I know of is to transition into a fatal state, meaning that
> > the producer has to be recreated after a delivery timeout. I think that
> is
> > still fine compared to the out-of-order messages.
> >
> > Looking forward to your reviews,
> > Daniel
> >
> > Dániel Urbán  ezt írta (időpont: 2022. júl. 7.,
> Cs,
> > 12:04):
> >
> > > Thanks for the feedback, I created
> > > https://issues.apache.org/jira/browse/KAFKA-14053 and started working
> on
> > > a PR.
> > >
> > > Luke, for the workaround, we used the transaction admin tool released
> in
> > > 3.0 to "abort" these hanging batches manually.
> > > Naturally, the cluster health should be stabilized. This issue popped
> up
> > > most frequently around times when some partitions went into a few
> minute
> > > window of unavailability. The infinite retries on the producer side
> > caused
> > > a situation where the last retry was still in-flight, but the delivery
> > > timeout was triggered on the client side. We reduced the retries and
> > > increased the delivery timeout to avoid such situations.
> > > Still, the issue can occur in other scenarios, for example a client
> > > queueing up many batches in the producer buffer, and causing those
> > batches
> > > to spend most of the delivery timeout window in the client memory.
> > >
> > > Thanks,
> > > Daniel
> > >
> > > Luke Chen  ezt írta (időpont: 2022. júl. 7., Cs,
> > 5:13):
> > >
> > >> Hi Daniel,
> > >>
> > >> Thanks for reporting the issue, and the investigation.
> > >> I'm curious, so, what's your workaround for this issue?
> > >>
> > >> I agree with Artem, it makes sense. Please file a bug in JIRA.
> > >> And looking forward to your PR! :)
> > >>
> > >> Thank you.
> > >> Luke
> > >>
> > >> On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits
> > >>  wrote:
> > >>
> > >> > Hi Daniel,
> > >> >
> > >> > What you say makes sense.  Could you file a bug and put this info
> > there
> > >> so
> > >> > that it's easier to track?
> > >> >
> > >> > -Artem
> > >> >
> > >> > On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán 
> > >> wrote:
> > >> >
> > >> > > Hello everyone,
> > >> > >
> > >> > > I've been investigating some transaction related issues in a very
> > >> > > problematic cluster. Besides finding some interesting issues, I
> had
> > >> some
> > >> > > ideas about how transactional producer behavior could be improved.
> > >> > >
> > >> > > My suggestion in short is: when the transactional producer
> > encounters
> > >> an
> > >> > > error which doesn't necessarily mean that the in-flight request
> was
> > >> > > processed (for example a client side timeout), the producer should
> > not
> > >> > send
> > >> > > an EndTxnRequest on abort, but instead it should bump the producer
> > >> epoch.
> > >> > >
> > >> > > The long description about the issue I found, and how I came to
> the
> > >> > > suggestion:
> > >> > >
> > >> > > First, the description of the issue. When I say that the cluster
> is
> > >> "very
> > >> > > problematic", I mean all kinds of different issues, be it infra
> > (disks
> > >> > and
> > >> > > network) or throughput (high volume producers without fine
> tuning).
> > >> > > In this cluster, Kafka transactions are widely used by many
> > producers.
> > >> > And
> > >> > > in this cluster, partitions get "stuck" frequently (few times
> every
> > >> > week).
> > >> > >
> > >> > > The exact meaning of a partition being "stuck" is this:
> > >> > >
> > >> > > On the client side:
> > >> > > 1. A transactional producer sends X batches to a partition in a
> > single
> > >> > > transaction
> > >> > > 2. Out of the X batches, the last few get sent, but are timed out
> > >> thanks
> > >> > to
> > >> > > the delivery timeout config
> > >> > > 3. producer.flush() is unblocked due to all batches being
> "finished"
> > >> > > 4. Based on the errors reported in the producer.send() callback,
> > >> > > producer.abortTransaction() is called
> > >> > > 5. Then producer.close() is also invoked with a 5s timeout (this
> > >> > > application does not reuse the producer instances optimally)
> > >> > > 6. The transactional.id of the producer is never reused (it was
> > >> random
> > >> > > generated)
> > >> > >
> > >> > > On the partition leader side (what appear

[DISCUSS] KIP-865 Metadata Transactions

2022-09-09 Thread David Arthur
Hey folks, I'd like to start a discussion on the idea of adding
transactions in the KRaft controller. This will allow us to overcome
the current limitation of atomic batch sizes in Raft which lets us do
things like create topics with a huge number of partitions.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-865+Metadata+Transactions

Thanks!
David


[jira] [Created] (KAFKA-14211) Streams log message has partition and offset transposed

2022-09-09 Thread Matt Allwood (Jira)
Matt Allwood created KAFKA-14211:


 Summary: Streams log message has partition and offset transposed
 Key: KAFKA-14211
 URL: https://issues.apache.org/jira/browse/KAFKA-14211
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.1.1
Reporter: Matt Allwood


The log warning message for out-of-order KTable update has partition and offset 
the wrong way around.

For example:
{noformat}
[...-StreamThread-1] WARN 
org.apache.kafka.streams.kstream.internals.KTableSource - Detected out-of-order 
KTable update for KTABLE-FK-JOIN-OUTPUT-STATE-STORE-000274, old 
timestamp=[1649245600022] new timestamp=[1642429126882]. 
topic=[...-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-000269-topic] 
partition=[2813] offset=[0].{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.3 #63

2022-09-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 574803 lines...]
[2022-09-09T05:16:42.242Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = false] PASSED
[2022-09-09T05:16:42.242Z] 
[2022-09-09T05:16:42.242Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = false] STARTED
[2022-09-09T05:16:48.235Z] 
[2022-09-09T05:16:48.236Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = false] PASSED
[2022-09-09T05:16:48.236Z] 
[2022-09-09T05:16:48.236Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = false] STARTED
[2022-09-09T05:16:54.156Z] 
[2022-09-09T05:16:54.156Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = false] PASSED
[2022-09-09T05:16:54.156Z] 
[2022-09-09T05:16:54.156Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED
[2022-09-09T05:16:57.162Z] 
[2022-09-09T05:16:57.162Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargeNumConsumers PASSED
[2022-09-09T05:16:57.162Z] 
[2022-09-09T05:16:57.162Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargePartitionCount STARTED
[2022-09-09T05:16:57.162Z] 
[2022-09-09T05:16:57.162Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargePartitionCount PASSED
[2022-09-09T05:16:57.162Z] 
[2022-09-09T05:16:57.162Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyThreadsPerClient STARTED
[2022-09-09T05:16:57.162Z] 
[2022-09-09T05:16:57.162Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyThreadsPerClient PASSED
[2022-09-09T05:16:57.162Z] 
[2022-09-09T05:16:57.162Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyStandbys STARTED
[2022-09-09T05:17:00.950Z] 
[2022-09-09T05:17:00.950Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED
[2022-09-09T05:17:00.950Z] 
[2022-09-09T05:17:00.950Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] STARTED
[2022-09-09T05:17:03.989Z] 
[2022-09-09T05:17:03.989Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyStandbys PASSED
[2022-09-09T05:17:03.989Z] 
[2022-09-09T05:17:03.989Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyThreadsPerClient STARTED
[2022-09-09T05:17:03.989Z] 
[2022-09-09T05:17:03.989Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyThreadsPerClient PASSED
[2022-09-09T05:17:03.989Z] 
[2022-09-09T05:17:03.989Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient STARTED
[2022-09-09T05:17:04.938Z] 
[2022-09-09T05:17:04.938Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient PASSED
[2022-09-09T05:17:04.938Z] 
[2022-09-09T05:17:04.938Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount STARTED
[2022-09-09T05:17:08.431Z] 
[2022-09-09T05:17:08.431Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] PASSED
[2022-09-09T05:17:08.431Z] 
[2022-09-09T05:17:08.431Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] STARTED
[2022-09-09T05:17:14.138Z] 
[2022-09-09T05:17:14.138Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] PASSED
[2022-09-09T05:17:14.138Z] 
[2022-09-09T05:17:14.138Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] STARTED
[2022-09-09T05:17:20.499Z] 
[2022-09-09T05:17:20.499Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] PASSED
[2022-09-09T05:17:20.499Z] 
[2022-09-09T05:17:20.499Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] STARTED
[2022-09-09T05:17:26.876Z] 
[2022-09-09T05:17:26.876Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] PASSED
[2022-09-09T05:17:26.876Z] 
[2022-09-09T05:17:26.876Z] 
org.apa

Re: [DISCUSSION] KIP-864: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Николай Ижиков
Hello, Guozhang.

Thanks for the feedback.
As this KIP very straightforward, is it worth to be voted right now?
Or should we wait more feedback?

> 9 сент. 2022 г., в 08:11, Guozhang Wang  написал(а):
> 
> Hello Николай,
> 
> Thanks for writing the KIP, I think it's rather straightforward and better
> to be consistent in tooling params. I'm +1.
> 
> Guozhang
> 
> 
> On Mon, Sep 5, 2022 at 11:25 PM Николай Ижиков  wrote:
> 
>> Hello.
>> 
>> Do we still want to make parameter names consistent in tools?
>> If yes, please, share your feedback on KIP.
>> 
>>> 31 авг. 2022 г., в 11:50, Николай Ижиков 
>> написал(а):
>>> 
>>> Hello.
>>> 
>>> I would like to start discussion on small KIP [1]
>>> The goal of KIP is to add the same —boostrap-server parameter to
>> `kafka-streams-appliation-reset.sh` tool as other tools use.
>>> Please, share your feedback.
>>> 
>>> [1]
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
>>> 
>> 
>> 
> 
> -- 
> -- Guozhang



Re: [VOTE] KIP-862: Self-join optimization for stream-stream joins

2022-09-09 Thread Vasiliki Papavasileiou
Hey Guozhang,

Ah it seems my text was not very clear :)
With "TOPOLOGY_OPTIMIZATION_CONFIG will be extended to accept a list of
optimization rule configs" I meant that it will accept the new value
strings for each optimization rule. Let me rephrase that in the KIP to make
it clearer.
Is it better now?

Best,
Vicky

On Thu, Sep 8, 2022 at 9:07 PM Guozhang Wang  wrote:

> Thanks Vicky,
>
> I read through the KIP again and it looks good to me. Just a quick question
> regarding the public config changes: you mentioned "No public interfaces
> will be impacted. The config TOPOLOGY_OPTIMIZATION_CONFIG will be extended
> to accept a list of optimization rule configs in addition to the global
> values "all" and "none" . But there are no new value strings mentioned in
> this KIP, so that means we will apply this optimization only when `all` is
> specified in the config right?
>
>
> Guozhang
>
>
> On Thu, Sep 8, 2022 at 12:02 PM Vasiliki Papavasileiou
>  wrote:
>
> > Hello everyone,
> >
> > I'd like to open the vote for KIP-862, which proposes to optimize
> > stream-stream self-joins by using a single state store for the join.
> >
> > The proposal is here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins
> >
> > Thanks to all who reviewed the proposal, and thanks in advance for taking
> > the time to vote!
> >
> > Thank you,
> > Vicky
> >
>
>
> --
> -- Guozhang
>


[VOTE] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread David Jacot
Hi all,

Thank you all for the very positive discussion about KIP-848. It looks
like folks are very positive about it overall.

I would like to start a vote on KIP-848, which introduces a brand new
consumer rebalance protocol.

The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.

Best,
David


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread David Jacot
Hi Guozhang,

I thought that the assignor will always be consulted when the next
heartbeat request is constructed. In other words,
`PartitionAssignor#metadata` will be called for every heartbeat. This
gives the opportunity for the assignor to enforce a rebalance by
setting the reason to a non-zero value or by changing the bytes. Do
you think that this is not sufficient? Are you concerned by the delay?

Best,
David

On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang  wrote:
>
> Hello David,
>
> One of Jun's comments make me thinking:
>
> ```
> In this case, a new assignment is triggered by the client side
> assignor. When constructing the HB, the consumer will always consult
> the client side assignor and propagate the information to the group
> coordinator. In other words, we don't expect users to call
> Consumer#enforceRebalance anymore.
> ```
>
> As I looked at the current PartitionAssignor's interface, we actually do
> not have a way yet to instruct how to construct the next HB request, e.g.
> when the assignor wants to enforce a new rebalance with a new assignment,
> we'd need some customizable APIs inside the PartitionAssignor to indicate
> the next HB telling broker about so. WDYT about adding such an API on the
> PartitionAssignor?
>
>
> Guozhang
>
>
> On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
> wrote:
>
> > Hi Jun,
> >
> > I have updated the KIP to include your feedback. I have also tried to
> > clarify the parts which were not cleared.
> >
> > Best,
> > David
> >
> > On Fri, Sep 2, 2022 at 4:18 PM David Jacot  wrote:
> > >
> > > Hi Jun,
> > >
> > > Thanks for your feedback. Let me start by answering your questions
> > > inline and I will update the KIP next week.
> > >
> > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to be
> > fewer
> > > > RPCs during rebalance and more efficient support of wildcard. A few
> > > > comments below.
> > >
> > > I would also add that the KIP removes the global sync barrier in the
> > > protocol which is essential to improve group stability and
> > > scalability, and the KIP also simplifies the client by moving most of
> > > the logic to the server side.
> > >
> > > > 30. ConsumerGroupHeartbeatRequest
> > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> > changing
> > > > of the partition assignor in the consumers?
> > >
> > > Definitely. The group coordinator will use the assignor used by a
> > > majority of the members. This allows the group to move from one
> > > assignor to another by a roll. This is explained in the Assignor
> > > Selection chapter.
> > >
> > > > 30.2 For each field, could you explain whether it's required in every
> > > > request or the scenarios when it needs to be filled? For example, it's
> > not
> > > > clear to me when TopicPartitions needs to be filled.
> > >
> > > The client is expected to set those fields in case of a connection
> > > issue (e.g. timeout) or when the fields have changed since the last
> > > HB. The server populates those fields as long as the member is not
> > > fully reconciled - the member should acknowledge that it has the
> > > expected epoch and assignment. I will clarify this in the KIP.
> > >
> > > > 31. In the current consumer protocol, the rack affinity between the
> > client
> > > > and the broker is only considered during fetching, but not during
> > assigning
> > > > partitions to consumers. Sometimes, once the assignment is made, there
> > is
> > > > no opportunity for read affinity because no replicas of assigned
> > partitions
> > > > are close to the member. I am wondering if we should use this
> > opportunity
> > > > to address this by including rack in GroupMember.
> > >
> > > That's an interesting idea. I don't see any issue with adding the rack
> > > to the members. I will do so.
> > >
> > > > 32. On the metric side, often, it's useful to know how busy a group
> > > > coordinator is. By moving the event loop model, it seems that we could
> > add
> > > > a metric that tracks the fraction of the time the event loop is doing
> > the
> > > > actual work.
> > >
> > > That's a great idea. I will add it. Thanks.
> > >
> > > > 33. Could we add a section on coordinator failover handling? For
> > example,
> > > > does it need to trigger the check if any group with the wildcard
> > > > subscription now has a new matching topic?
> > >
> > > Sure. When the new group coordinator takes over, it has to:
> > > * Setup the session timeouts.
> > > * Trigger a new assignment if a client side assignor is used. We don't
> > > store the information about the member selected to run the assignment
> > > so we have to start a new one.
> > > * Update the topics metadata, verify the wildcard subscriptions, and
> > > trigger a rebalance if needed.
> > >
> > > > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
> > > > ConsumerGroupMemberMetadataValue: Could we document what the epoch
> > field
> > > > reflects? For example, does the epoch in ConsumerGroupMetadataValue
>