[jira] [Created] (KAFKA-14847) Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS

2023-03-24 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14847:
-

 Summary: Separate the callers of commitAllTasks v.s. commitTasks 
for EOS(-v2) and ALOS
 Key: KAFKA-14847
 URL: https://issues.apache.org/jira/browse/KAFKA-14847
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today, EOS-v2/v1 and ALOS shares the same internal callpath inside 
TaskManager/TaskExecutor for committing tasks from various scenarios, the call 
path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> 
{{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can 
be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, 
this is fine to commit just a subset of the tasks; however for EOS-v1, since 
all tasks participate in the same txn it could lead to dangerous violations, 
and today we are relying on all the callers of the commit function to make sure 
that the list of tasks they passed in, under EOS-v2, would still not violate 
the semantics. As summarized today (thanks to Matthias), today that callee 
could be triggered in the following cases:

1) Inside handleRevocation() -- this is a clean path, an we add all non-revoked 
tasks with commitNeeded() flag set to the commit -- so this seems to be fine.
2) tryCloseCleanAllActiveTasks() -- here we only call it, if 
tasksToCloseDirty.isEmpty() -- so it seems fine, too.
3) commit() with a list of task handed in -- we call commit() inside the TM 
three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as 
commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() -- here, we only consider RUNNING and RESTORING 
tasks, which are not corrupted -- note we only throw a TaskCorruptedException 
during restore state initialization, thus, corrupted tasks did not process 
anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() -- under EOS-v2, as we just commit a 
subset of tasks' source offsets while at the same time we still commit those 
unsuccessful task's outgoing records if there are any.

Just going through this list of callers itself, as demonstrated above, is 
already pretty complex, and very vulnerable to bugs. It's better to not rely on 
the callers, but the callees to make sure that's the case. More concretely, I 
think we can introduce a new function called {{commitAllTasks}} such that under 
EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are 
some tasks that should not be committed because we know they have not processed 
any data, the {{commitAllTasks}} callee itself would do some clever filtering 
internally.

Given its scope, I think it's better to do this refactoring after EOS-v1 is 
removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12639) AbstractCoordinator ignores backoff timeout when joining the consumer group

2023-02-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12639.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> AbstractCoordinator ignores backoff timeout when joining the consumer group
> ---
>
> Key: KAFKA-12639
> URL: https://issues.apache.org/jira/browse/KAFKA-12639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.7.0
>Reporter: Matiss Gutmanis
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> We observed heavy logging while trying to join consumer group during partial 
> unavailability of Kafka cluster (it's part of our testing process). Seems 
> that {{rebalanceConfig.retryBackoffMs}} used in  {{ 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeeded}}
>  is not respected. Debugging revealed that {{Timer}} instance technically is 
> expired thus using sleep of 0 milliseconds which defeats the purpose of 
> backoff timeout.
> Minimal backoff timeout should be respected.
>  
> {code:java}
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs

2023-02-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-14253.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> StreamsPartitionAssignor should print the member count in assignment logs
> -
>
> Key: KAFKA-14253
> URL: https://issues.apache.org/jira/browse/KAFKA-14253
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Christopher Pooya Razavian
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.5.0
>
>
> Debugging rebalance and assignment issues is harder than it needs to be. One 
> simple thing that can help is to print out information in the logs that users 
> have to compute today.
> For example, the StreamsPartitionAssignor prints two messages that contain 
> the the newline-delimited group membership:
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] All members participating in this rebalance:
> : []
> : []
> : []{code}
> and
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] 
> to clients as:
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])
> {code}
>  
> In both of these cases, it would be nice to:
>  # Include the number of members in the group (I.e., "15 members 
> participating" and "to 15 clients as")
>  # sort the member ids (to help compare the membership and assignment across 
> rebalances)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14641) Cleanup CommitNeeded after EOS-V1 is removed

2023-01-19 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14641:
-

 Summary: Cleanup CommitNeeded after EOS-V1 is removed
 Key: KAFKA-14641
 URL: https://issues.apache.org/jira/browse/KAFKA-14641
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


This is a follow-up of KAFKA-14294.

Today we have several flags to determine if KS need to execute a commit: 1) 
task-level "commitNeeded" which is set whenever process() or punctuator() is 
called, 2) if there are input topic offsets to commit, retrieved from the 
"task.prepareCommit()", 3) the "transactionInFlight" flag from producer as a 
fix of KAFKA-14294 (this subsumes the first "commitNeeded" functionality).

Given that we are still having EOS-v1, cleanup this would be a bit complex. But 
after the deprecated EOS-V1 is removed, we can cleanup those controls since for 
any commit cases, we would need to commit all tasks anyways whereas in EOS-v1, 
we would commit probably a subset of tasks since they are done by different 
producers and hence different txns.

A quick thought is the following:

1) We would not need the per-task "commitNeeded" anymore.
2) We would maintain a single "commitNeeded" flag on the task-executor, hence 
on the thread level. It is set whenever `process()` or `punctuator` is called.
3) Whenever we need to commit, either a) periodically, b) upon revocation, c) 
upon user request, we simply check that flag, and if necessary commit all tasks 
and reset the flag.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14239) Merge StateRestorationIntegrationTest into RestoreIntegrationTest

2022-09-16 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14239:
-

 Summary: Merge StateRestorationIntegrationTest into 
RestoreIntegrationTest
 Key: KAFKA-14239
 URL: https://issues.apache.org/jira/browse/KAFKA-14239
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Guozhang Wang


We have two integration test classes for store restoration, and 
StateRestorationIntegrationTest only has one single test method. We can merge 
it with the other to save integration testing time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14130) Reduce RackAwarenessIntegrationTest to a unit test

2022-09-04 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-14130.
---
Fix Version/s: 3.4.0
 Assignee: Guozhang Wang  (was: Eslam)
   Resolution: Fixed

> Reduce RackAwarenessIntegrationTest to a unit test
> --
>
> Key: KAFKA-14130
> URL: https://issues.apache.org/jira/browse/KAFKA-14130
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: newbie
> Fix For: 3.4.0
>
>
> While working on KAFKA-13877, I feel it's an overkill to introduce the whole 
> test class as an integration test, since all we need is to just test the 
> assignor itself which could be a unit test. Running this suite with 9+ 
> instances takes long time and is still vulnerable to all kinds of timing 
> based flakiness. A better choice is to reduce it as a unit test, similar to 
> {{HighAvailabilityStreamsPartitionAssignorTest}} that just test the behavior 
> of the assignor itself, rather than creating many instances hence depend on 
> various timing bombs to not explode.
> The scope of this ticket is to refactor the {{RackAwarenessIntegrationTest}} 
> into a {{RackAwarenessStreamsPartitionAssignorTest}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent

2022-08-03 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14138:
-

 Summary: The Exception Throwing Behavior of Transactional Producer 
is Inconsistent
 Key: KAFKA-14138
 URL: https://issues.apache.org/jira/browse/KAFKA-14138
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Guozhang Wang


There's an issue for inconsistent error throwing inside Kafka Producer when 
transactions are enabled. In short, there are two places where the received 
error code from the brokers would be eventually thrown to the caller:

* Recorded on the batch's metadata, via "Sender#failBatch"
* Recorded on the txn manager, via "txnManager#handleFailedBatch".

The former would be thrown from 1) the `Future` returned from 
the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, the 
latter would be thrown from `producer.send()` directly in which we call 
`txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown from 
the former, it's not wrapped hence the direct exception (e.g. 
ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. 
KafkaException(ClusterAuthorizationException). And which one would be thrown 
depend on a race condition since we cannot control by the time the caller 
thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's 
error has been sent back or not.

For example consider the following sequence:


1. caller thread: within future = producer.send(), call recordAccumulator.append

2. sender thread: drain the accumulator, send the produceRequest and get the 
error back.

3. caller thread: within future = producer.send(), call 
txnManager.maybeAddPartition

4. sender thread: get the addPartition token, send the txnRequest and get the 
error back. NOTE the sender thread could send these two requests in any order.

5. caller thread: future.get()

In a sequence where then 3) happened before 2), we would only get the raw 
exception at step 5; in a sequence where 2) happened before 3), then we would 
throw the exception immediately at 3).

This inconsistent error throwing is pretty annoying for users since they'd need 
to handle both cases, but many of them actually do not know this trickiness. We 
should make the error throwing consistent, e.g. we should consider: 1) which 
errors would be thrown from callback / future.get, and which would be thrown 
from the `send` call directly, and these errors should better be 
non-overlapping, 2) whether we should wrap the raw error or not, we should do 
so consistently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-08-03 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13877.
---
Resolution: Fixed

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14130) Reduce RackAwarenessIntegrationTest to a unit test

2022-08-01 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14130:
-

 Summary: Reduce RackAwarenessIntegrationTest to a unit test
 Key: KAFKA-14130
 URL: https://issues.apache.org/jira/browse/KAFKA-14130
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Guozhang Wang


While working on KAFKA-13877, I feel it's an overkill to introduce the whole 
test class as an integration test, since all we need is to just test the 
assignor itself which could be a unit test. Running this suite with 9+ 
instances takes long time and is still vulnerable to all kinds of timing based 
flakiness. A better choice is to reduce it as a unit test, similar to 
{{HighAvailabilityStreamsPartitionAssignorTest}} that just test the behavior of 
the assignor itself, rather than creating many instances hence depend on 
various timing bombs to not explode.

The scope of this ticket is to refactor the {{RackAwarenessIntegrationTest}} 
into a {{RackAwarenessStreamsPartitionAssignorTest}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics

2022-07-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13846.
---
Fix Version/s: 3.3.0
   Resolution: Fixed

> Add an overloaded metricOrElseCreate function in Metrics
> 
>
> Key: KAFKA-13846
> URL: https://issues.apache.org/jira/browse/KAFKA-13846
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
> Fix For: 3.3.0
>
>
> The `Metrics` registry is often used by concurrent threads, however it's 
> get/create APIs are not well suited for it. A common pattern from the user 
> today is:
> {code}
> metric = metrics.metric(metricName);
> if (metric == null) {
>   try {
> metrics.createMetric(..)
>   } catch (IllegalArgumentException e){
> // another thread may create the metric at the mean time
>   }
> } 
> {code}
> Otherwise the caller would need to synchronize the whole block trying to get 
> the metric. However, the `createMetric` function call itself indeed 
> synchronize internally on updating the metric map.
> So we could consider adding a metricOrElseCreate function which is similar to 
> createMetric, but instead of throwing an illegal argument exception within 
> the internal synchronization block, it would just return the already existing 
> metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13880) DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages

2022-06-17 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13880.
---
Fix Version/s: 3.3.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages
> --
>
> Key: KAFKA-13880
> URL: https://issues.apache.org/jira/browse/KAFKA-13880
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Artem Livshits
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.3.0
>
>
> While working on KIP-794, I noticed that DefaultStreamPartitioner does not 
> call .onNewBatch.  The "sticky" DefaultStreamPartitioner introduced as a 
> result of https://issues.apache.org/jira/browse/KAFKA-8601 requires 
> .onNewBatch call in order to switch to a new partitions for unkeyed messages, 
> just calling .partition would return the same "sticky" partition chosen 
> during the first call to .partition.  The partition doesn't change even if 
> the partition leader is unavailable.
> Ideally, for unkeyed messages the DefaultStreamPartitioner should take 
> advantage of the new built-in partitioning logic introduced in 
> [https://github.com/apache/kafka/pull/12049.]  Perhaps, it could return null 
> partition for unkeyed message, so that KafkaProducer could run built-in 
> partitioning logic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13934) Consider consolidating TimeWindow / SessionWindow / SlidingWindow

2022-05-24 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13934:
-

 Summary: Consider consolidating TimeWindow / SessionWindow / 
SlidingWindow
 Key: KAFKA-13934
 URL: https://issues.apache.org/jira/browse/KAFKA-13934
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


In Streams windowing operations we have several inherited classes from 
`Window`, as listed in the title of the ticket. They represent differences for:

1) Serialization of the window as part of the windowed key.
2) Window operations which is based on inclusive/exclusiveness of the window 
start/end.

As a result, we have resulted in lots of duplicated code to handle those 
different windows in windowed aggregations.

We can consider if it's worth serializing those window types differently 
(especially if we can get rid of the sequence id for time windows used for 
joins) and if we can just have a single class with booleans indicating 
inclusive/exclusiveness of the start/end, and hence as a result can largely 
reduce our code duplication around the serde and common window operations 
inside the stateful operator.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13745.
---
Resolution: Fixed

> Flaky 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
> -
>
> Key: KAFKA-13745
> URL: https://issues.apache.org/jira/browse/KAFKA-13745
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
>   at 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13800.
---
Resolution: Fixed

> Remove force cast of TimeWindowKStreamImpl in tests of 
> https://github.com/apache/kafka/pull/11896
> -
>
> Key: KAFKA-13800
> URL: https://issues.apache.org/jira/browse/KAFKA-13800
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> We can remove the cast after `emitStrategy` is added to public api



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13746.
---
Resolution: Fixed

> Flaky 
> kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
> 
>
> Key: KAFKA-13746
> URL: https://issues.apache.org/jira/browse/KAFKA-13746
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
>   at 
> kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13877:
-

 Summary: Flaky 
RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
 Key: KAFKA-13877
 URL: https://issues.apache.org/jira/browse/KAFKA-13877
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Guozhang Wang


The following test fails on local testbeds about once per 10-15 runs:

{code}
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:87)
at org.junit.Assert.assertTrue(Assert.java:42)
at org.junit.Assert.assertTrue(Assert.java:53)
at 
org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13647.
---
Resolution: Incomplete

I resolved the ticket for now as incomplete, since the streams code cannot 
alone fix the issue, since it's on the rocksDB side to fix.

> RocksDb metrics 'number-open-files' is not correct
> --
>
> Key: KAFKA-13647
> URL: https://issues.apache.org/jira/browse/KAFKA-13647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Sylvain Le Gouellec
>Priority: Major
> Attachments: image-2022-02-07-16-06-25-304.png, 
> image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png
>
>
> We were looking at RocksDB metrics and noticed that the {{number-open-files}} 
> metric behaves like a counter, rather than a gauge. 
> Looking at the code, we think there is a small error in the type of metric 
> for that specific mbean (should be a value metric rather than a sum metric).
> See [ 
> https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics

2022-04-21 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13846:
-

 Summary: Add an overloaded metricOrElseCreate function in Metrics
 Key: KAFKA-13846
 URL: https://issues.apache.org/jira/browse/KAFKA-13846
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: Guozhang Wang


The `Metrics` registry is often used by concurrent threads, however it's 
get/create APIs are not well suited for it. A common pattern from the user 
today is:

{code}
metric = metrics.metric(metricName);

if (metric == null) {
  try {
metrics.createMetric(..)
  } catch (IllegalArgumentException e){
// another thread may create the metric at the mean time
  }
} 
{code}

Otherwise the caller would need to synchronize the whole block trying to get 
the metric. However, the `createMetric` function call itself indeed synchronize 
internally on updating the metric map.

So we could consider adding a metricOrElseCreate function which is similar to 
createMetric, but instead of throwing an illegal argument exception within the 
internal synchronization block, it would just return the already existing 
metric.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13799.
---
Fix Version/s: 3.3.0
 Assignee: RivenSun
   Resolution: Fixed

> Improve documentation for Kafka zero-copy
> -
>
> Key: KAFKA-13799
> URL: https://issues.apache.org/jira/browse/KAFKA-13799
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.3.0
>
>
> Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
> and [https://kafka.apache.org/documentation/#networklayer] ,
> We can know that Kafka combines pagecache and zero-copy when reading messages 
> in files on disk, which greatly improves the consumption rate of messages.
> But after browsing the source code:
> Look directly at the *FileRecords.writeTo(...)* method,
> 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
> and the bottom layer calls the sendfile method to implement zero-copy data 
> transfer.
> 2. The logic of the SslTransportLayer.transferFrom() method: 
> {code:java}
> fileChannel.read(fileChannelBuffer, pos) 
> -> 
> sslEngine.wrap(src, netWriteBuffer) 
> -> 
> flush(ByteBuffer buf) && socketChannel.write(buf){code}
> That is, first read the data on the disk or directly from the page cache, 
> then encrypt the data, and finally send the encrypted data to the network. 
> {*}FileChannel.transferTo() is not used in the whole process{*}.
>  
> Conclusion: 
> PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
> SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time

2022-03-24 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13692.
---
Fix Version/s: 3.3.0
   (was: 3.2.0)
   Resolution: Fixed

> stream thread blocked-time-ns-total metric does not include producer metadata 
> wait time
> ---
>
> Key: KAFKA-13692
> URL: https://issues.apache.org/jira/browse/KAFKA-13692
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>Priority: Major
> Fix For: 3.3.0
>
>
> The stream thread blocked-time-ns-total metric does not include producer 
> metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can 
> contribute significantly to actual total blocked time in some cases. For 
> example, if a user deletes the streams sink topic, producers will wait until 
> the max block timeout. This time does not get included in total blocked time 
> when it should.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13766:
-

 Summary: Use `max.poll.interval.ms` as the timeout during 
complete-rebalance phase
 Key: KAFKA-13766
 URL: https://issues.apache.org/jira/browse/KAFKA-13766
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Guozhang Wang


The lifetime of a consumer can be categorized in three phases:

1) During normal processing, the broker expects a hb request periodically from 
consumer, and that is timed by the `session.timeout.ms`.

2) During the prepare_rebalance, the broker would expect a join-group request 
to be received within the rebalance.timeout, which is piggy-backed as the 
`max.poll.interval.ms`.

3) During the complete_rebalance, the broker would expect a sync-group request 
to be received again within the `session.timeout.ms`.

So during different phases of the life of the consumer, different timeout would 
be used to bound the timer.

Nowadays with cooperative rebalance protocol, we can still return records and 
process them in the middle of a rebalance from {{consumer.poll}}. In that case, 
for phase 3) we should also use the `max.poll.interval.ms` to bound the timer, 
which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13765) Describe-consumer admin should not return unstable membership information

2022-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13765:
-

 Summary: Describe-consumer admin should not return unstable 
membership information
 Key: KAFKA-13765
 URL: https://issues.apache.org/jira/browse/KAFKA-13765
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: Guozhang Wang


When a consumer group is in the “prepare-rebalance” phase, it's unclear if all 
its currently registered members would still be re-joining in the new 
generation or not, in this case, if we simply return the current members map to 
the describe-consumer request it may be misleading as users would be getting 
spurious results that may contain those dropping or even zombie consumers.

So I think during the prepare-rebalance phase, we should either only return 
members who's join-group requests have already been received, OR we simply 
return the response with no members and indicate that via prepare-rebalance 
state the membership info is unstable and hence won't be returned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13728) PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.

2022-03-21 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13728.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> PushHttpMetricsReporter no longer pushes metrics when network failure is 
> recovered.
> ---
>
> Key: KAFKA-13728
> URL: https://issues.apache.org/jira/browse/KAFKA-13728
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.1.0
>Reporter: XiaoyiPeng
>Priority: Minor
> Fix For: 3.2.0
>
>
> The class *PushHttpMetricsReporter* no longer pushes metrics when network 
> failure is recovered.
> I debugged the code and found the problem here :
> [https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221]
>  
> When we submit a task to the *ScheduledThreadPoolExecutor* that needs to be 
> executed periodically, if the task throws an exception and is not swallowed, 
> the task will no longer be scheduled to execute.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed

2022-03-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13746:
-

 Summary: Flaky 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
 Key: KAFKA-13746
 URL: https://issues.apache.org/jira/browse/KAFKA-13746
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 3.2.0


Example: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/

{code}
java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
at 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-03-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13745:
-

 Summary: Flaky 
kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
 Key: KAFKA-13745
 URL: https://issues.apache.org/jira/browse/KAFKA-13745
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 3.2.0


Example: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/

{code}
org.opentest4j.AssertionFailedError: expected:  but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
at 
kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13737:
-

 Summary: Flaky 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
 Key: KAFKA-13737
 URL: https://issues.apache.org/jira/browse/KAFKA-13737
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests

{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: describeTopics
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
at scala.util.Try$.apply(Try.scala:210)
at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
at 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-13736:
---

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13736.
---
Resolution: Duplicate

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13736:
-

 Summary: Flaky 
kafka.network.SocketServerTest.closingChannelWithBufferedReceives
 Key: KAFKA-13736
 URL: https://issues.apache.org/jira/browse/KAFKA-13736
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests

{code}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
at 
kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13735:
-

 Summary: Flaky 
kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
 Key: KAFKA-13735
 URL: https://issues.apache.org/jira/browse/KAFKA-13735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests

{code}
Stacktrace
java.lang.IllegalStateException: Channel closed too early
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511)
at scala.Option.getOrElse(Option.scala:201)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482)
at 
kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-13421:
---

Re-opening this ticket since the test is still failing.

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Jason Gustafson
>Priority: Major
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead

2022-03-09 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13722:
-

 Summary: Update internal interfaces that use ProcessorContext to 
use StateStoreContext instead
 Key: KAFKA-13722
 URL: https://issues.apache.org/jira/browse/KAFKA-13722
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


This is a remainder that when we remove the deprecated public APIs that uses 
the ProcessorContext, like `StateStore.init`, we should also consider updating 
the internal interfaces with the ProcessorContext as well. That includes:

1. Segments and related util classes which use ProcessorContext.
2. For state stores that leverage on ProcessorContext.getXXXTime, their logic 
should be moved out of the state store impl but to the processor node level 
that calls on these state stores.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12256) auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION

2022-02-11 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12256.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
> -
>
> Key: KAFKA-12256
> URL: https://issues.apache.org/jira/browse/KAFKA-12256
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.0.0
>Reporter: Ryan Leslie
>Priority: Minor
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.2.0
>
>
> In KAFKA-6829 a change was made to the consumer to internally retry commits 
> upon receiving UNKNOWN_TOPIC_OR_PARTITION.
> Though this helped mitigate issues around stale broker metadata, there were 
> some valid concerns around the negative effects for routine topic deletion:
> https://github.com/apache/kafka/pull/4948
> In particular, if a commit is issued for a deleted topic, retries can block 
> the consumer for up to max.poll.interval.ms. This is tunable of course, but 
> any amount of stalling in a consumer can lead to unnecessary lag.
> One of the assumptions while permitting the change was that in practice it 
> should be rare for commits to occur for deleted topics, since that would 
> imply messages were being read or published at the time of deletion. It's 
> fair to expect users to not delete topics that are actively published to. But 
> this assumption is false in cases where auto commit is enabled.
> With the current implementation of auto commit, the consumer will regularly 
> issue commits for all topics being fetched from, regardless of whether or not 
> messages were actually received. The fetch positions are simply flushed, even 
> when they are 0. This is simple and generally efficient, though it does mean 
> commits are often redundant. Besides the auto commit interval, commits are 
> also issued at the time of rebalance, which is often precisely at the time 
> topics are deleted.
> This means that in practice commits for deleted topics are not really rare. 
> This is particularly an issue when the consumer is subscribed to a multitude 
> of topics using a wildcard. For example, a consumer might subscribe to a 
> particular "flavor" of topic with the aim of auditing all such data, and 
> these topics might dynamically come and go. The consumer's metadata and 
> rebalance mechanisms are meant to handle this gracefully, but the end result 
> is that such groups are often blocked in a commit for several seconds or 
> minutes (the default is 5 minutes) whenever a delete occurs. This can 
> sometimes result in significant lag.
> Besides having users abandon auto commit in the face of topic deletes, there 
> are probably multiple ways to deal with this, including reconsidering if 
> commits still truly need to be retried here, or if this behavior should be 
> more configurable; e.g. having a separate commit timeout or policy. In some 
> cases the loss of a commit and subsequent message duplication is still 
> preferred to processing delays. And having an artificially low 
> max.poll.interval.ms or rebalance.timeout.ms comes with its own set of 
> concerns.
> In the very least the current behavior and pitfalls around delete with active 
> consumers should be documented.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffic

2022-02-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13310.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> ThirdDebugLog1.png, ThirdDebugLog2.png, brokerCpu.png, brokerNetBytes.png, 
> kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group". See as below:
>  
> {code:java}
>  if (includeMetadataInTimeout) {
> // try to update assignment metadata BUT do not need to block on the 
> timer for join group
> updateAssignmentMetadataIfNeeded(timer, false);
> } else {
> while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), 
> true)) {
> log.warn("Still waiting for metadata");
> }
> }{code}
>  
>  
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to the method 
> joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMs is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = 

[jira] [Resolved] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2022-02-06 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13563.
---
Fix Version/s: 3.2.0
   3.1.1
   Resolution: Fixed

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0, 3.1.1
>
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception

2022-02-04 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13346.
---
Resolution: Not A Problem

> Kafka Streams fails due to RocksDB Locks Not Available Exception
> 
>
> Key: KAFKA-13346
> URL: https://issues.apache.org/jira/browse/KAFKA-13346
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Amit Gupta
>Priority: Major
>
> Hello,
> We are using Kafka Streams and we observe that some times on some of the 
> hosts running streams application, Kafka streams instance fails with 
> unexpected exception. We are running with 40 stream threads per host and 20 
> hosts in total.
> Can some one please help on what can be the root cause here?
>  
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location .
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  ~[kafka-streams-2.6.0.jar:?]
>  Caused by: org.rocksdb.RocksDBException: lock : 
> ./0_468/rocksdb/state-store/LOCK: No locks available
>  at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?]
>  at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211)
>  ~[kafka-streams-2.6.0.jar:?]
>  ... 15 more
>   
>  Some times I also see this exception
>   |
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location ./0_433/rocksdb/state-store
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  

[jira] [Created] (KAFKA-13561) Consider deprecating `StreamsBuilder#build(props)` function

2021-12-21 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13561:
-

 Summary: Consider deprecating `StreamsBuilder#build(props)` 
function
 Key: KAFKA-13561
 URL: https://issues.apache.org/jira/browse/KAFKA-13561
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


With 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
 being accepted that introduced the new `StreamsBuilder(TopologyConfig)` 
constructor, we can consider deprecating the `StreamsBuilder#build(props)` 
function now. There are still a few things we'd need to do:

1. Copy the `StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG` to TopologyConfig.
2. Make sure the overloaded `StreamsBuilder()` constructor takes in default 
values of TopologyConfig.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty

2021-10-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13319.
---
Fix Version/s: 3.1.0
 Assignee: Guozhang Wang  (was: Ryan)
   Resolution: Fixed

> Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty
> ---
>
> Key: KAFKA-13319
> URL: https://issues.apache.org/jira/browse/KAFKA-13319
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: newbie
> Fix For: 3.1.0
>
>
> If a user calls `Producer.sendOffsetsToTransaction` with an empty map of 
> offsets, we can shortcut return and skip the logic to add the offsets topic 
> to the transaction. The main benefit is avoiding the unnecessary accumulation 
> of markers in __consumer_offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13371) Consider consolidating Joined / StreamJoined / TableJoined

2021-10-12 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13371:
-

 Summary: Consider consolidating Joined / StreamJoined / TableJoined
 Key: KAFKA-13371
 URL: https://issues.apache.org/jira/browse/KAFKA-13371
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


This is an idea while reviewing KAFKA-13261 (adding TabledJoined). We have now 
three control objects: Joined, StreamJoined, TableJoined. All of them extends 
NamedOperations and hence has the `name` field inherited which would be used 
for the processor node's name and potentially store names. In addition to that

* Joined: used in stream-table joins. Contains key and two value serdes used 
for serializing the bytes for repartitioning (however since today we only 
repartition one side if needed, the other value serde is never used). 
* StreamJoined: used in stream-stream joins. It includes the serdes, AND also 
the store suppliers and other control variables on the store names.
* TableJoined: used in table-table foreign key joins. It does not include any 
serdes but includes the partitioner information.

The main difference between these different constructs are:

* KTables themselves have embedded a materialized mechanism via 
`valueGetterSupplier` whenever they are created, either from source, or from 
aggregate / join operators, so they do not need extra materialization 
indicators when participated in a follow-up join --- i.e. they either are 
already materialized from the operators that generate them, or they will 
"grandfather" back to the upstream KTable on the fly with a logical view when 
that view is being fetched via the `ValueGetterSupplier`. On the other hand, 
KStreams do not have materialization mechanism inherently and hence operators 
that do need to materialize the streams then need to provide such methods.
* Table-table foreign-key join has a special needs for partitioners.

[~vvcephei] has a good proposal for 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and 
as part of that proposal we could consider adding partitioner for source 
streams / tables and inherit throughout the topology pipeline. Following that 
idea, we can consider consolidating the above "Joined" objects by isolating the 
materialization / partitioner variables. More specifically, here's a concrete 
proposal:

1) `StreamsBuilder.table/stream` would pass in an optional partitioner.
2) And similarly all operators that changes the key would allow an optional 
partitioner:
2.a) `KStream.repartition/groupBy` and `KTable.groupBy` would allow an optional 
partitioner in `Repartitioned`, as piggy-backed we would also deprecate 
`Grouped` with `Repartitioned` since the latter would subsume the former.
2.b) `KStream.map/flatMap/selectKey` stays as is, and similar to serdes, these 
operators would stop the inheritance of partitioners of the upstream entities.
3) `Repartition` would also add the key/value serdes used for serializing for 
the repartition topics.
4) `KStream.join(KTable)` and `KStream.join(KStream)` would pass in an optional 
`Repartitioned` in addition to `Joined` which can be used to encode the 
partitioner info.
5) Foreign-key `KTable.join(KTable)` would pass in an optional `Repartitioned` 
which can be used to encode the partitioner info.
7) As a result of all above points, we can then reduce `StreamJoined` / 
`TableJoined` / `Joined` since all their enwrapped control objects are not 
separated in `Repartitioned` and `Materialized`: note that for `StreamJoined`, 
the store suppliers / names / configs would now be wrapped in two Materialized 
objects which would still not be exposed for IQ.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-10-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13268.
---
Resolution: Duplicate

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Victoria Xia
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13356) Use "delete" retention policy only for stream-stream join windowed stores

2021-10-06 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13356:
-

 Summary: Use "delete" retention policy only for stream-stream join 
windowed stores
 Key: KAFKA-13356
 URL: https://issues.apache.org/jira/browse/KAFKA-13356
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today stream-stream join associated window stores, like any other window 
stores, use "delete,compact" as their retention policies. However, since today 
we add sequence number to disable de-duplication of keys, "compaction" would 
never be able to compact any keys, but only result in 1) CPU waste on the 
cleaner thread on brokers, 2) some additional feature of brokers that relies on 
"delete" policy to not be able to apply.

Until we change the store format potentially in the future to not use sequence 
number for disable de-duping, we could consider just changing the policy to 
"delete" for stream-stream join's window store for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2021-10-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13349:
-

 Summary: Allow Iterator.remove on KeyValueIterator
 Key: KAFKA-13349
 URL: https://issues.apache.org/jira/browse/KAFKA-13349
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today Stream's state store's range iterator does not support `remove`. We could 
consider adding such support for all the built-in state stores:

* RocksDB's native iterator does not support removal, but we can always do a 
delete(key) concurrently while the iterator is open on the snapshot.
* In-Memory: straight forward implementation.

The benefit of that is then for range-and-delete truncation operation we do not 
necessarily have to be cautious about concurrent modification exceptions. This 
could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13301.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Fix For: 3.1.0
>
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-09-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13216.
---
Resolution: Fixed

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 3.1.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS

2021-09-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13249.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> Checkpoints do not contain latest offsets on shutdown when using EOS
> 
>
> Key: KAFKA-13249
> URL: https://issues.apache.org/jira/browse/KAFKA-13249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.7.0, 2.8.0
>Reporter: Oliver Hutchison
>Assignee: Oliver Hutchison
>Priority: Major
> Fix For: 3.1.0
>
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app 
> is shutdown does not always contain changelog offsets which represent the 
> latest state of the state store. The offsets can often be behind the end of 
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts 
> after shutting down cleanly as streams thinks (based on the incorrect offsets 
> in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single 
> instance streams app from around 10 second to sometime over 2 minutes in our 
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} 
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last 
> commit
>   // and hence we need to refresh checkpointable offsets regardless whether 
> we should checkpoint or not
>   if (commitNeeded) {
> stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app 
> where an app simply starts, runs and then shuts down the {{if 
> (commitNeeded)}} test always fails when running with EOS which results in the 
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this 
> is the case as there's only 1 place in the code which calls 
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final 
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} 
> state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
> maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", 
> state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to 
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as 
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? 
> Again looking only at the steady state case we find that it's only called 
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from 
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it 
> happens *after* all active tasks have commited. Which means that 
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test 
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the 
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to 
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always 
> update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13286) Revisit Streams State Store and Serde Implementation

2021-09-09 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13286:
-

 Summary: Revisit Streams State Store and Serde Implementation
 Key: KAFKA-13286
 URL: https://issues.apache.org/jira/browse/KAFKA-13286
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series 
of serialize functions are called along side the state store hierarchies, we 
then just need to make sure that what's should be appended first to the 
ByteBuffer would be serialized first. E.g. if the serialized bytes format of a 
WindowSchema is 

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set 
of byte arrays that can be re-used. This can be captured as an intelligent 
{{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use 
existing ones in the pool; the gottcha though is that we may usually not know 
the serialized byte length for raw keys (think: in practice the keys would be 
in json/avro etc), and hence would not know how to pass in {{size}} for 
serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer 
back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also 
allocate them from RocksDB directly so that using them for puts/gets would not 
go through JNI, hence is more efficient. The Supplier then would need to be 
careful to deallocate these direct byte-buffers since they would not be GC'ed 
by the JVM.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-09-02 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13268:
-

 Summary: Add more integration tests for Table Table FK joins with 
repartitioning
 Key: KAFKA-13268
 URL: https://issues.apache.org/jira/browse/KAFKA-13268
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Guozhang Wang


We should add to the FK join multipartition integration test with a 
Repartitioned for:
1) just the new partition count
2) a custom partitioner

This is to test if there's a bug where the internal topics don't pick up a 
partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version

2021-09-01 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13257.
---
Resolution: Not A Problem

> KafkaStreams Support For Latest RocksDB Version
> ---
>
> Key: KAFKA-13257
> URL: https://issues.apache.org/jira/browse/KAFKA-13257
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Alagukannan
>Priority: Major
> Attachments: hs_err_pid6.log
>
>
> Hi,
>  Can you please let us know if there is any plan for adding the latest 
> versions of rocksDB in kafka streams. If your planning it what's the timeline 
> we are looking at. If not planning to upgrade what's the reason behind it. Is 
> there any significant impact on upgrading like backward combability etc.. 
> Just to remind this general query to know about the rocksdb upgrade and its 
> impact on streams application.
> The main pain point behind asking this upgrade is, We tried to build an 
> application with kafka streams 2.8.0 on an alpine based OS and the docker 
> base image is as follows  
> azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless.  The streams 
> application worked fine until it had an interaction with state 
> store(rocksdb). The jvm crashed with the following error:
>  #
>  # A fatal error has been detected by the Java Runtime Environment:
>  #
>  # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207
>  #
>  # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) 
> (build 11.0.10+9-LTS)
>  # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
>  # Problematic frame:
>  # C [librocksdbjni15322693993163550519.so+0x271b27] 
> std::_Rb_tree, 
> std::less, std::allocator 
> >::_M_erase(std::_Rb_tree_node*)+0x27
> Then we found out rocksdb works well on glibc and not musl lib, where as 
> alpine supports musl lib alone for native dependencies. Further looking into 
> rocksdb for a solution we found that they have started supporting both glib 
> and musl native libs from 6.5.x versions.
>  But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is 
> the main reason behind asking for the rocksDB upgrade in kafka streams as 
> well.
> Have attached the PID log where JVM failures are happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13175) The topic is marked for deletion, create topic with the same name throw exception topic already exists.

2021-09-01 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13175.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> The topic is marked for deletion, create topic with the same name throw 
> exception topic already exists.
> ---
>
> Key: KAFKA-13175
> URL: https://issues.apache.org/jira/browse/KAFKA-13175
> Project: Kafka
>  Issue Type: Bug
>Reporter: yangshengwei
>Priority: Major
> Fix For: 3.1.0
>
> Attachments: kafka (2).jpg, zookeeper.jpg
>
>
> After a topic is deleted, the topic is marked for deletion, create topic with 
> the same name throw exception topic already exists. It should throw exception 
> the topic is marked for deletion. I can choose to wait for the topic to be 
> completely deleted. If the topic is still not deleted for a long time, we 
> need to check the reason why it is not deleted.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13243) Differentiate metric latency measured in millis and nanos

2021-08-27 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13243:
-

 Summary: Differentiate metric latency measured in millis and nanos
 Key: KAFKA-13243
 URL: https://issues.apache.org/jira/browse/KAFKA-13243
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: Guozhang Wang


Today most of the client latency metrics are measured in millis, and some in 
nanos. For those measured in nanos we usually differentiate them by having a 
`-ns` suffix in the metric names, e.g. `io-wait-time-ns-avg` and 
`io-time-ns-avg`. But there are a few that we obviously forgot to follow this 
pattern, e.g. `io-wait-time-total`: it is inconsistent where `avg` has `-ns` 
suffix and `total` has not. I did a quick search and found just two of them:

* bufferpool-wait-time-total : bufferpool-wait-time-ns-total
* io-wait-time-total: io-wait-time-ns-total

We should change their name accordingly with the `-ns` suffix as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13239) Use RocksDB.ingestExternalFile for restoration

2021-08-27 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13239:
-

 Summary: Use RocksDB.ingestExternalFile for restoration
 Key: KAFKA-13239
 URL: https://issues.apache.org/jira/browse/KAFKA-13239
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Now that we are in newer version of RocksDB, we can consider using the new

{code}
ingestExternalFile(final ColumnFamilyHandle columnFamilyHandle,
  final List filePathList,
  final IngestExternalFileOptions ingestExternalFileOptions)
{code}

for restoring changelog into state stores. More specifically:

1) Use larger default batch size in restore consumer polling behavior so that 
each poll would return more records as possible.
2) For a single batch of records returned from a restore consumer poll call, 
first write them as a single SST File using the {{SstFileWriter}}. The existing 
{{DBOptions}} could be used to construct the {{EnvOptions} and {{Options}} for 
the writter.
Do not yet ingest the written file to the db yet within each iteration
3) At the end of the restoration, call {{RocksDB.ingestExternalFile}} given all 
the written files' path as the parameter. The {{IngestExternalFileOptions}} 
would be specifically configured to allow key range overlapping with mem-table.
4) A specific note is that after the call in 3), heavy compaction may be 
executed by RocksDB in the background and before it cools down, starting normal 
processing immediately which would try to {{put}} new records into the store 
may see high stalls. To work around it we would consider using 
{{RocksDB.compactRange()}} which would block until the compaction is completed.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13170) Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown

2021-08-10 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13170.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown
> --
>
> Key: KAFKA-13170
> URL: https://issues.apache.org/jira/browse/KAFKA-13170
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.1.0
>
>
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown_2/]
> {code:java}
> Stacktracejava.lang.AssertionError: unexpected exception type thrown; 
> expected: but 
> was:
>   at org.junit.Assert.assertThrows(Assert.java:1020)
>   at org.junit.Assert.assertThrows(Assert.java:981)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported

2021-08-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13172:
-

 Summary: Document in Streams 3.0 that due to rocksDB footer 
version in-filght downgrade is not supported
 Key: KAFKA-13172
 URL: https://issues.apache.org/jira/browse/KAFKA-13172
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8683) Flakey test InternalTopicManagerTest @shouldNotCreateTopicIfExistsWithDifferentPartitions

2021-07-30 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8683.
--
Resolution: Cannot Reproduce

Have not seen this flakiness recently and also cannot reproduce locally after 
10,000 runs, closing for now.

> Flakey test  InternalTopicManagerTest 
> @shouldNotCreateTopicIfExistsWithDifferentPartitions
> --
>
> Key: KAFKA-8683
> URL: https://issues.apache.org/jira/browse/KAFKA-8683
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/414/consoleFull]
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest > 
> shouldNotCreateTopicIfExistsWithDifferentPartitions PASSED*00:05:46* ERROR: 
> Failed to write output for test null.Gradle Test Executor 5*00:05:46* 
> java.lang.NullPointerException: Cannot invoke method write() on null 
> object*00:05:46*   at 
> org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:91)*00:05:46*
> at 
> org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:47)*00:05:46*
>  at 
> org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*00:05:46*
>   at 
> org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:34)*00:05:46*
>at 
> org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*00:05:46*
>   at java_io_FileOutputStream$write.call(Unknown Source)*00:05:46*
> at build_9s5vsq3vnws1928hdaummvzb1$_r



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-07-30 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13152:
-

 Summary: Replace "buffered.records.per.partition" with 
"input.buffer.max.bytes" 
 Key: KAFKA-13152
 URL: https://issues.apache.org/jira/browse/KAFKA-13152
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


The current config "buffered.records.per.partition" controls how many records 
in maximum to bookkeep, and hence it is exceed we would pause fetching from 
this partition. However this config has two issues:

* It's a per-partition config, so the total memory consumed is dependent on the 
dynamic number of partitions assigned.
* Record size could vary from case to case.

And hence it's hard to bound the memory usage for this buffering. We should 
consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
which controls how much bytes in total is allowed to be buffered. This is 
doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9858) CVE-2016-3189 Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 allows remote attackers to cause a denial of service (crash) via a crafted bzip2 file, related

2021-07-29 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9858.
--
Fix Version/s: 3.0.0
   Resolution: Fixed

> CVE-2016-3189  Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 
> allows remote attackers to cause a denial of service (crash) via a crafted 
> bzip2 file, related to block ends set to before the start of the block.
> -
>
> Key: KAFKA-9858
> URL: https://issues.apache.org/jira/browse/KAFKA-9858
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: sihuanx
>Priority: Major
> Fix For: 3.0.0
>
>
> I'm not sure whether  CVE-2016-3189 affects kafka 2.4.1  or not?  This 
> vulnerability  was related to rocksdbjni-5.18.3.jar  which is compiled with 
> *bzip2 .* 
> Is there any task or plan to fix it? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-23 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13008.
---
  Assignee: Guozhang Wang
Resolution: Fixed

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12957) Refactor Streams Logical Plan Generation

2021-06-16 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12957:
-

 Summary: Refactor Streams Logical Plan Generation
 Key: KAFKA-12957
 URL: https://issues.apache.org/jira/browse/KAFKA-12957
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


There is a general issue of Streams logical plan -> physical plan generation, 
where the physical processor nodes are generated at the parsing phase rather 
than the logical plan compilation phase. The former stage is agnostic to any 
user configurations while only the latter stage have access to it, and hence we 
would not generate physical processor nodes during the parsing phase (i.e. any 
code related to StreamsBuilder), but defer them to the logical plan phase (i.e. 
XXNode.writeToTopology). This has several issues such that many physical 
processor instantiation requires to access the configs, and hence we have to 
defer it to the `init` procedure of the node, which is scattered in many places 
from logical nodes to physical processors.

This would be a big refactoring on Stream's logical plan generation, but I 
think it would worth to get this in a cleaner state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2021-06-11 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10585.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Rohan Desai
>Assignee: Dongjin Lee
>Priority: Minor
>  Labels: newbie++
> Fix For: 3.0.0
>
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`

2021-06-08 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12920:
-

 Summary: Consumer's cooperative sticky assignor need to clear 
generation / assignment data upon `onPartitionsLost`
 Key: KAFKA-12920
 URL: https://issues.apache.org/jira/browse/KAFKA-12920
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


Consumer's cooperative-sticky assignor does not track the owned partitions 
inside the assignor --- i.e. when it reset its state in event of 
``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the 
assignor would not be cleared. This would cause a member to join with empty 
generation on the protocol while with non-empty user-data encoding the old 
assignment still (and hence pass the validation check on broker side during 
JoinGroup), and eventually cause a single partition to be assigned to multiple 
consumers within a generation.

We should let the assignor to also clear its assignment/generation when 
``onPartitionsLost`` is triggered in order to avoid this scenario.

Note that 1) for the regular sticky assignor the generation would still have an 
older value, and this would cause the previously owned partitions to be 
discarded during the assignment, and 2) for Streams' sticky assignor, it’s 
encoding would indeed be cleared along with ``onPartitionsLost``. Hence only 
Consumer's cooperative-sticky assignor have this issue to solve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12918) Watch-Free Animated 'America: The Motion Picture' Trailer 2021 Full Hd Download

2021-06-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12918.
---

> Watch-Free Animated 'America: The Motion Picture' Trailer 2021 Full Hd  
> Download
> 
>
> Key: KAFKA-12918
> URL: https://issues.apache.org/jira/browse/KAFKA-12918
> Project: Kafka
>  Issue Type: Bug
>Reporter: mushfiqur rahoman
>Priority: Major
>
> America’ Trailer: Channing Tatum Voices Foul-Mouthed George Washington in 
> Netflix Comedy Channing Tatum Leads a Revolution in America: The Motion 
> Picture Trailer Channing Tatum, Simon Pegg, Killer Mike Star in First Trailer 
> for America: The Motion Picture: Watch Online Full HD Free
> ###
> Watch Here ▶️▶️ [https://streamsable.com/movies/]
> Download Here ▶️▶️ [https://streamsable.com/movies/]
> ###
> To help usher in a (mostly Covid-free?) 4th of July weekend, Netflix has 
> something special lined up: an animated film that’s an R-Rated take on the 
> American Revolution. “America: The Motion Picture” offers a radically 
> different take on the familiar history of America’s inception as a country. 
> George Washington and other founding fathers rally the colonial troops to 
> victory against the British but in a totally wild and anachronistic fashion. 
> Here’s the official synopsis:
> READ MORE: Netflix Unveils Massive Summer Slate Teaser Trailer: New Films 
> With Felicity Jones, Jason Mamoa, Shailene Woodley, Kevin Hart, Liam Neeson & 
> More
> In this wildly tongue-in-cheek animated revisionist history, a 
> chainsaw-wielding George Washington assembles a team of rabble-rousers — 
> including beer-loving bro Sam Adams, famed scientist Thomas Edison, acclaimed 
> horseman Paul Revere, and a very pissed off Geronimo — to defeat Benedict 
> Arnold and King James in the American Revolution. Who will win? No one knows, 
> but you can be sure of one thing: these are not your father’s Founding… uh, 
> Fathers.
> Channing Tatum leads the voice cast as George Washington. Alongside him is 
> Simon Pegg as King James, Bobby Moynihan as Paul Revere, Raoul Trujillo as 
> Geronimo, and Jason Mantzoukas as Sam Adams. Judy Greer is also on board as 
> Martha Dandridge, as is Olivia Munn as Thomas Edison (yes, you read that 
> right). Will Forte, Andy Samberg, rapper Killer Mike, and Amber Nash are also 
> part of the cast.
> READ MORE: The 100 Most Anticipated Films of 2021
> Matt Thompson, one of the executive producers of the cult animated show 
> “Archer,” directs David Callaham‘s (‘Wonder Woman;’ ‘Shang-Chi And The Legend 
> Of The Ten Rings‘) screenplay. Thompson and Callaham also serve as producers 
> with Adam Reed, also on the “Archer” team. Tatum also has a producer’s credit 
> with Peter Kiernan and Reed Carolin under his Free Association company. Phil 
> Lord and Christopher Miller, the dream team behind “The Lego Movie,” also 
> serve as producers with Will Allegra through Lord Miller.
> READ MORE: Channing Tatum Reuniting With Lord And Miller For Universal 
> Monster Movie
> What other crazy surprises does “America: The Motion Picture” has in store 
> for its audience? Find out on June 30, when the film hits Netflix. Check out 
> the trailer below.
> Channing Tatum's R-rated George Washington and the rest of the Founding 
> Fathers unite in a trailer for Netflix's America: The Motion Picture.
> The trailer begins by reminding us this animated film comes "From the 
> Founding Fathers who brought you Archer, Spider-Man: Into the Spider-Verse, 
> The Expendables and Magic Mike." The Magic Mike part then comes into play 
> when a scene of gyrating dancers with neon clothing is quickly shown. Next, 
> we are introduced to Tatum's George Washington, who delivers the surprising 
> declaration, "I'm George Washington. Let's go start a fucking revolution."
> Netflix has released a ridiculous trailer for its star-studded animated 
> comedy “America: The Motion Picture,” which stars Channing Tatum as the voice 
> of a beefed-up and vulgar George Washington in a satirical take on the 
> American Revolution. The movie hails from “Archer” producer Matt Thompson, 
> who directs a script by “Wonder Woman” writer Dave Callahan. With Tatum in an 
> executive producer role alongside partner Reid Carolin as well as Phil Lord 
> and Chris Miller, the wacky historical comedy is sure to be a hit with its 
> target audience.
> Here’s the official synopsis: “For, like, thousands of years, the origins of 
> the United States of America have remained shrouded in mystery, lost to the 
> sands of time. Who built this ‘country tis of thee,’ and why? Only the 
> dinosaurs know… until now. For the first time in human history, the 
> incredible, completely true story of America’s 

[jira] [Resolved] (KAFKA-10614) Group coordinator onElection/onResignation should guard against leader epoch

2021-06-07 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10614.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Group coordinator onElection/onResignation should guard against leader epoch
> 
>
> Key: KAFKA-10614
> URL: https://issues.apache.org/jira/browse/KAFKA-10614
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Guozhang Wang
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 3.0.0
>
>
> When there are a sequence of LeaderAndISR or StopReplica requests sent from 
> different controllers causing the group coordinator to elect / resign, we may 
> re-order the events due to race condition. For example:
> 1) First LeaderAndISR request received from old controller to resign as the 
> group coordinator.
> 2) Second LeaderAndISR request received from new controller to elect as the 
> group coordinator.
> 3) Although threads handling the 1/2) requests are synchronized on the 
> replica manager, their callback {{onLeadershipChange}} would trigger 
> {{onElection/onResignation}} which would schedule the loading / unloading on 
> background threads, and are not synchronized.
> 4) As a result, the {{onElection}} maybe triggered by the thread first, and 
> then {{onResignation}}. As a result, the coordinator would not recognize it 
> self as the coordinator and hence would respond any coordinator request with 
> {{NOT_COORDINATOR}}.
> Here are two proposals on top of my head:
> 1) Let the scheduled load / unload function to keep the passed in leader 
> epoch, and also materialize the epoch in memory. Then when execute the 
> unloading check against the leader epoch.
> 2) This may be a bit simpler: using a single background thread working on a 
> FIFO queue of loading / unloading jobs, since the caller are actually 
> synchronized on replica manager and order preserved, the enqueued loading / 
> unloading job would be correctly ordered as well. In that case we would avoid 
> the reordering. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE

2021-06-03 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12887:
-

 Summary: Do not trigger user-customized ExceptionalHandler for RTE
 Key: KAFKA-12887
 URL: https://issues.apache.org/jira/browse/KAFKA-12887
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today in StreamThread we have a try-catch block that captures all {{Throwable 
e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. 
However, there are possible RTEs such as IllegalState/IllegalArgument 
exceptions which are usually caused by bugs, etc. In such cases we should not 
let users to decide what to do with these exceptions, but should let Streams 
itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we 
should fail fast to notify the potential error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12812) Consider refactoring state store registration path

2021-05-18 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12812:
-

 Summary: Consider refactoring state store registration path
 Key: KAFKA-12812
 URL: https://issues.apache.org/jira/browse/KAFKA-12812
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Guozhang Wang


Today our state store registration call path within the stateManager (both 
local and global) is like this: 

{code}
for each store: store.init(store, context)
   -> context.register(root, callback)
   -> stateManager.registerStore(store, callback)
{code}

One can see that, we have an awkward loop from stateManager back to 
stateManager, and we require users to not forget calling context.register(root, 
callback). We do this only in order to let users pass the customized callback 
implementation to the stateManager.

What about a different path like this:

1) We add a new interface in StateStore, like `StateRestoreCallback 
getCallback()` that each impl class need to provide.
2) We remove the `context.register(root, callback)` call; and because of that, 
we do not need to pass in `root` in the store.init as well.
3) stateManager just call `store.init(context)` (without the first parameter), 
and then put the store along with its restore callback into the map, without 
the separate `registerStore` function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12683) Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"

2021-05-01 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12683.
---
Fix Version/s: 3.0.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"
> -
>
> Key: KAFKA-12683
> URL: https://issues.apache.org/jira/browse/KAFKA-12683
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12693) Consecutive rebalances with zombie instances may cause corrupted changelogs

2021-04-19 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12693:
-

 Summary: Consecutive rebalances with zombie instances may cause 
corrupted changelogs
 Key: KAFKA-12693
 URL: https://issues.apache.org/jira/browse/KAFKA-12693
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


When an instance (or thread within an instance) of Kafka Streams has a soft 
failure and the group coordinator triggers a rebalance, that instance would 
temporarily become a "zombie writer". That is, this instance does not know 
there's already a new rebalance and hence its partitions have been migrated 
out, until it tries to commit and then got notified of the illegal-generation 
error and realize itself is the "zombie" already. During this period until the 
commit, this zombie may still be writing data to the changelogs of the migrated 
tasks as the new owner has already taken over and also writing to the 
changelogs.

When EOS is enabled, this would not be a problem: when the zombie tries to 
commit and got notified that it's fenced, its zombie appends would be aborted. 
With EOS disabled, though, such shared writes would be interleaved on the 
changelogs where a zombie append may arrive later after the new writer's 
append, effectively overwriting that new append.

Note that such interleaving writes do not necessarily cause corrupted data: as 
long as the new producer keep appending after the old zombie stops, and all the 
corrupted keys are overwritten again by the new values, then it is fine. 
However, if there are consecutive rebalances where right after the changelogs 
are corrupted by zombie writers, and before the new writer can overwrite them 
again, the task gets migrated again and needs to be restored from changelogs, 
the old values would be restored instead of the new values, effectively causing 
data loss.

Although this should be a rare event, we should fix it asap still. One idea is 
to have producers get a PID even under ALOS: that is, we set the transactional 
id in the producer config, but did not trigger any txn APIs; when there are 
zombie producers, they would then be immediately fenced on appends and hence 
there's no interleaved appends. I think this may require a KIP still, since 
today one has to call initTxn in order to register and get the PID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12683) Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"

2021-04-18 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12683:
-

 Summary: Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"
 Key: KAFKA-12683
 URL: https://issues.apache.org/jira/browse/KAFKA-12683
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12633) Remove deprecated "TopologyTestDriver#pipeInput / readOutput"

2021-04-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12633.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Remove deprecated "TopologyTestDriver#pipeInput / readOutput"
> -
>
> Key: KAFKA-12633
> URL: https://issues.apache.org/jira/browse/KAFKA-12633
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12669) Add deleteRange to WindowStore / KeyValueStore interfaces

2021-04-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12669:
-

 Summary: Add deleteRange to WindowStore / KeyValueStore interfaces
 Key: KAFKA-12669
 URL: https://issues.apache.org/jira/browse/KAFKA-12669
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Guozhang Wang


We can consider adding such APIs where the underlying implementation classes 
have better optimizations than deleting the keys as get-and-delete one by one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12643) Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in transform/process (this.context.schedule function)

2021-04-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12643.
---
Resolution: Duplicate

Thanks for confirming!

> Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in 
> transform/process (this.context.schedule function)
> -
>
> Key: KAFKA-12643
> URL: https://issues.apache.org/jira/browse/KAFKA-12643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: David EVANO
>Priority: Major
> Attachments: Capture d’écran 2021-04-09 à 17.50.05.png
>
>
> During a tranform() or a process() method:
> Define a schedule tyask:
> this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, 
> timestamp -> \{...}
> store.put(...) or context.forward(...) produce a record with an invalid 
> timestamp.
> For the forward, a workaround is define the timestamp:
> context.forward(entry.key, entry.value.toString(), 
> To.all().withTimestamp(timestamp));
> But for state.put(...) or state.delete(...) functions there is no workaround.
> Is it mandatory to have the Kafka broker version aligned with the Kafka 
> Streams version?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12633) Remove deprecated "TopologyTestDriver#pipeInput / readOutput"

2021-04-08 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12633:
-

 Summary: Remove deprecated "TopologyTestDriver#pipeInput / 
readOutput"
 Key: KAFKA-12633
 URL: https://issues.apache.org/jira/browse/KAFKA-12633
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12630) Remove deprecated KafkaClientSupplier#getAdminClient

2021-04-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12630.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Remove deprecated KafkaClientSupplier#getAdminClient
> 
>
> Key: KAFKA-12630
> URL: https://issues.apache.org/jira/browse/KAFKA-12630
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12568) Remove deprecated "KStream#groupBy/join", "Joined#named" overloads

2021-04-07 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12568.
---
Fix Version/s: 3.0.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> Remove deprecated "KStream#groupBy/join", "Joined#named" overloads
> --
>
> Key: KAFKA-12568
> URL: https://issues.apache.org/jira/browse/KAFKA-12568
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12630) Remove deprecated KafkaClientSupplier#getAdminClient

2021-04-07 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12630:
-

 Summary: Remove deprecated KafkaClientSupplier#getAdminClient
 Key: KAFKA-12630
 URL: https://issues.apache.org/jira/browse/KAFKA-12630
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7785) Remove PartitionGrouper interface and it's config and move DefaultPartitionGrouper to internal package

2021-04-07 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7785.
--
Resolution: Fixed

> Remove PartitionGrouper interface and it's config and move 
> DefaultPartitionGrouper to internal package
> --
>
> Key: KAFKA-7785
> URL: https://issues.apache.org/jira/browse/KAFKA-7785
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jacek Laskowski
>Assignee: highluck
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Since {{DefaultPartitionGrouper}} is only for the purpose of the internal 
> {{StreamsPartitionAssignor}} it would make sense to have it in the 
> {{org.apache.kafka.streams.processor.internals}} package.
> I would also vote to move {{PartitionGrouper.}}
> Via KAFKA-8927 we deprecated the `PartitionGrouper` interface in 2.4 release 
> – this allows us to remove the public interface and its corresponding config 
> in the next major release (ie, 3.0.0). `DefaultPartitionGrouper` was 
> implicitly deprecated via KAFKA-8927.
> Hence, we can move the interface as well as the default implementation into 
> an internal package (or maybe just remove the interface completely as there 
> are no plans to support multiple implementations atm).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9765) Could not add partitions to transaction due to errors

2021-04-02 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9765.
--
Resolution: Duplicate

> Could not add partitions to transaction due to errors
> -
>
> Key: KAFKA-9765
> URL: https://issues.apache.org/jira/browse/KAFKA-9765
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.3.1
>Reporter: Prashant Waykar
>Priority: Blocker
> Fix For: 2.4.2, 2.5.0
>
>
> I am following the producer with transactions example in 
> [https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html,]
>  and on kafkaException, I use abortTransaction and retry. 
> I am seeing these exceptions. Has anyone experienced this before ? Please 
> suggest
> {code:java}
> // code placeholder
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.KafkaException: Could not add partitions to 
> transaction due to errors: {nfvAlarmJob-0=UNKNOWN_SERVER_ERROR}
> at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
> at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
> at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
> at 
> com.vmware.vchs.hybridity.messaging.kafka.KafkaProducerDelegate.publishMessageWithTransaction(KafkaProducerDelegate.java:197)
> at 
> com.vmware.vchs.hybridity.messaging.kafka.KafkaProducerDelegate.publish(KafkaProducerDelegate.java:164)
> at 
> com.vmware.vchs.hybridity.messaging.kafka.KafkaProducerDelegate.publish(KafkaProducerDelegate.java:158)
> at 
> com.vmware.vchs.hybridity.messaging.adapter.JobManagerJobPublisher.publish(JobManagerJobPublisher.java:140)
> at 
> com.vmware.vchs.hybridity.messaging.adapter.JobManager.queueJob(JobManager.java:1720)
> at 
> com.vmware.vchs.hybridity.messaging.adapter.JobManagementAdapter.queueJob(JobManagementAdapter.java:80)
> at 
> com.vmware.vchs.hybridity.messaging.adapter.JobManagementAdapter.queueJob(JobManagementAdapter.java:70)
> at 
> com.vmware.vchs.hybridity.messaging.adapter.JobManagedService.queueJob(JobManagedService.java:168)
> at 
> com.vmware.hybridity.nfvm.alarm.UpdateVcenterAlarmsJob.run(UpdateVcenterAlarmsJob.java:67)
> at 
> com.vmware.vchs.hybridity.messaging.LoggingJobWrapper.run(LoggingJobWrapper.java:41)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Could not add partitions 
> to transaction due to errors: {nfvAlarmJob-0=UNKNOWN_SERVER_ERROR}
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1230)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> ... 1 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12428) Add a last-heartbeat-seconds-ago metric to Kafka Consumer

2021-03-29 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12428.
---
Resolution: Duplicate

> Add a last-heartbeat-seconds-ago metric to Kafka Consumer
> -
>
> Key: KAFKA-12428
> URL: https://issues.apache.org/jira/browse/KAFKA-12428
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> I have encountered several issues in the past where heartbeat requests are 
> not sent [1,2] (either in time, or ever), and today it is a bit hard to get 
> to that from the logs. I think it is better to add a metric as 
> "last-heartbeat-seconds-ago" where when rebalances were triggered we can 
> immediately find out if this is the root cause.
> 1. https://issues.apache.org/jira/browse/KAFKA-10793
> 2. https://issues.apache.org/jira/browse/KAFKA-10827



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7106) Remove segment/segmentInterval from Window definition

2021-03-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7106.
--
Resolution: Fixed

> Remove segment/segmentInterval from Window definition
> -
>
> Key: KAFKA-7106
> URL: https://issues.apache.org/jira/browse/KAFKA-7106
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Guozhang Wang
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Currently, Window configures segment and segmentInterval properties, but 
> these aren't truly properties of a window in general.
> Rather, they are properties of the particular implementation that we 
> currently have: a segmented store. Therefore, these properties should be 
> moved to configure only that implementation.
>  
> This may be related to KAFKA-4730, since an in-memory window store wouldn't 
> necessarily need to be segmented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12562) Remove deprecated-overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store"

2021-03-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12562.
---
Resolution: Fixed

> Remove deprecated-overloaded "KafkaStreams#metadataForKey" and 
> "KafkaStreams#store"
> ---
>
> Key: KAFKA-12562
> URL: https://issues.apache.org/jira/browse/KAFKA-12562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12524) Remove deprecated WindowBytesStoreSupplier#segments

2021-03-27 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12524.
---
Resolution: Fixed

> Remove deprecated WindowBytesStoreSupplier#segments
> ---
>
> Key: KAFKA-12524
> URL: https://issues.apache.org/jira/browse/KAFKA-12524
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12568) Remove deprecated "KStream#groupBy/join", "Joined#named" overloads

2021-03-27 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12568:
-

 Summary: Remove deprecated "KStream#groupBy/join", "Joined#named" 
overloads
 Key: KAFKA-12568
 URL: https://issues.apache.org/jira/browse/KAFKA-12568
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12562) Remove deprecated-overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store"

2021-03-25 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12562:
-

 Summary: Remove deprecated-overloaded 
"KafkaStreams#metadataForKey" and "KafkaStreams#store"
 Key: KAFKA-12562
 URL: https://issues.apache.org/jira/browse/KAFKA-12562
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12527) Remove deprecated "PartitionGrouper" interface

2021-03-25 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12527.
---
Resolution: Duplicate

> Remove deprecated "PartitionGrouper" interface
> --
>
> Key: KAFKA-12527
> URL: https://issues.apache.org/jira/browse/KAFKA-12527
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12526) Remove deprecated long ms overloads

2021-03-25 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12526.
---
Resolution: Duplicate

> Remove deprecated long ms overloads
> ---
>
> Key: KAFKA-12526
> URL: https://issues.apache.org/jira/browse/KAFKA-12526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12549) Allow state stores to opt-in transactional support

2021-03-24 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12549:
-

 Summary: Allow state stores to opt-in transactional support
 Key: KAFKA-12549
 URL: https://issues.apache.org/jira/browse/KAFKA-12549
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Guozhang Wang


Right now Kafka Stream's EOS implementation does not make any assumptions about 
the state store's transactional support. Allowing the state stores to 
optionally provide transactional support can have multiple benefits. E.g., if 
we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
{{commitTxn}} and {{abortTxn}}. Then these APIs can be used under both ALOS and 
EOS such that:

* store.beginTxn
* store.put // during processing
* streams commit // either through eos protocol or not
* store.commitTxn

We can have the following benefits:
* Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
still, but some middle-ground where uncommitted data within a state store would 
not be retained if store.commitTxn failed).
* No need to wipe the state store and re-bootstrap from scratch upon crashes 
for EOS. E.g., if a crash-failure happened between streams commit completes and 
store.commitTxn. We can instead just roll-forward the transaction by replaying 
the changelog from the second recent  streams committed offset towards the most 
recent committed offset.
* Remote stores that support txn then does not need to support wiping 
(https://issues.apache.org/jira/browse/KAFKA-12475).
* We can fix the known issues of emit-on-change 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
* We can support "query committed data only" for interactive queries (see below 
for reasons).

As for the implementation of these APIs, there are several options:
* The state store itself have natural transaction features (e.g. RocksDB).
* Use an in-memory buffer for all puts within a transaction, and upon 
`commitTxn` write the whole buffer as a batch to the underlying state store, or 
just drop the whole buffer upon aborting. Then for interactive queries, one can 
optionally only query the underlying store for committed data only.
* Use a separate store as the transient persistent buffer. Upon `beginTxn` 
create a new empty transient store, and upon `commitTxn` merge the store into 
the underlying store. Same applies for interactive querying committed-only data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12527) Remove deprecated "PartitionAssignor" interface

2021-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12527:
-

 Summary: Remove deprecated "PartitionAssignor" interface
 Key: KAFKA-12527
 URL: https://issues.apache.org/jira/browse/KAFKA-12527
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12526) Remove deprecated long ms overloads

2021-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12526:
-

 Summary: Remove deprecated long ms overloads
 Key: KAFKA-12526
 URL: https://issues.apache.org/jira/browse/KAFKA-12526
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12524) Remove deprecated WindowBytesStoreSupplier#segments

2021-03-22 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12524:
-

 Summary: Remove deprecated WindowBytesStoreSupplier#segments
 Key: KAFKA-12524
 URL: https://issues.apache.org/jira/browse/KAFKA-12524
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12514) NPE in SubscriptionState

2021-03-22 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12514.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> NPE in SubscriptionState
> 
>
> Key: KAFKA-12514
> URL: https://issues.apache.org/jira/browse/KAFKA-12514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a soak test, we got this exception:
>  
> {code:java}
> java.lang.NullPointerExceptionat 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.partitionLag(SubscriptionState.java:545)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.currentLag(KafkaConsumer.java:2241)
>   at 
> org.apache.kafka.streams.processor.internals.PartitionGroup.readyToProcess(PartitionGroup.java:143)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.isProcessable(StreamTask.java:650)
>at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:661)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1114)
>  {code}
> This is related to the implementation of:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization]
> aka
> https://issues.apache.org/jira/browse/KAFKA-10091
>  
> Luckily, the stack trace is pretty unambiguous. I'll open a PR shortly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12472:
-

 Summary: Add a Consumer / Streams metric to indicate the current 
rebalance status
 Key: KAFKA-12472
 URL: https://issues.apache.org/jira/browse/KAFKA-12472
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Reporter: Guozhang Wang


Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. CoordinatorRequested => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. NewMember => when the join group response has a MemberIdRequired error 
code.
  3. UnknownMember => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. StaleMember => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. DroppedGroup => when hb thread decides to call leaveGroup due to hb 
expired.
  6. UserRequested => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. MetadataChanged => requestRejoin triggered since metadata has changed.
  8. SubscriptionChanged => requestRejoin triggered since subscription has 
changed.
  9. RetryOnError => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. RevocationNeeded => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then its overrides all embedded consumer layer status code. In addition, when 
create aggregated metric across streams instance within an app, we also follow 
the same aggregation rule, e.g. if there are two streams instance where one 
instance's status code is 1), and the other is 10), then the app's status is 
10).

 10. RevocationNeeded => the definition of this is changed to the original 10) 
defined in consumer above, OR leader decides to revoke either active/standby 
tasks and hence schedule follow-ups.
 11. AssignmentProbing => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. VersionProbing => leader decides to schedule follow-ups due to version 
probing.
 13. EndpointUpdate => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12428) Add a last-heartbeat-seconds-ago metric to Kafka Consumer

2021-03-04 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12428:
-

 Summary: Add a last-heartbeat-seconds-ago metric to Kafka Consumer
 Key: KAFKA-12428
 URL: https://issues.apache.org/jira/browse/KAFKA-12428
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


I have encountered several issues in the past where heartbeat requests are not 
sent [1,2] (either in time, or ever), and today it is a bit hard to get to that 
from the logs. I think it is better to add a metric as 
"last-heartbeat-seconds-ago" where when rebalances were triggered we can 
immediately find out if this is the root cause.

1. https://issues.apache.org/jira/browse/KAFKA-10793
2. https://issues.apache.org/jira/browse/KAFKA-10827



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0

2021-03-04 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12419:
-

 Summary: Remove Deprecated APIs of Kafka Streams in 3.0
 Key: KAFKA-12419
 URL: https://issues.apache.org/jira/browse/KAFKA-12419
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
 Fix For: 3.0.0


Here's a list of deprecated APIs that we have accumulated in the past, we can 
consider removing them in 3.0:

* KIP-198: "--zookeeper" flag from StreamsResetter (1.0)
* KIP-171: "–execute" flag from StreamsResetter (1.1)
* KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1)
* KIP-251: overloaded "ProcessorContext#forward" (2.0)
* KIP-276: "StreamsConfig#getConsumerConfig" (2.0)
* KIP-319: "WindowBytesStoreSupplier#segments" (2.1)
* KIP-321: "TopologyDescription.Source#topics" (2.1)
* KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1)
* KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1)
* KIP-365/366: Implicit Scala Apis (2.1)
* KIP-372: overloaded "KStream#groupBy" (2.1)
* KIP-307: "Joined#named" (2.3)
* KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3)
* KIP-429: "PartitionAssignor" interface (2.4)
* KIP-470: "TopologyTestDriver#pipeInput" (2.4)
* KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4)
* KIP-479: overloaded "KStream#join" (2.4)
* KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5)
* KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and 
"KafkaStreams#store" (2.5)

And here's a list of already filed JIRAs for removing deprecated APIs
* KAFKA-10434
* KAFKA-7785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12323) Record timestamps not populated in event

2021-02-23 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12323.
---
Resolution: Fixed

> Record timestamps not populated in event
> 
>
> Key: KAFKA-12323
> URL: https://issues.apache.org/jira/browse/KAFKA-12323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Adam Bellemare
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1
>
> Attachments: PunctuateTimestampZeroTest.java
>
>
> Upgraded a kafka streams application from 2.6.0 to 2.7.0. Noticed that the 
> events being produced had a "CreatedAt" timestamp = 0, causing downstream 
> failures as we depend on those timestamps. Reverting back to 2.6.0/2.6.1 
> fixed this issue. This was the only change to the Kafka Streams application.
> Consuming the event stream produced by 2.6.0 results in events that, when 
> consumed using the `kafka-avro-console-consumer` and `--property 
> print.timestamp=true` result in events prepended with the event times, such 
> as:
> {code:java}
> CreateTime:1613072202271  
> CreateTime:1613072203412  
> CreateTime:1613072205431  
> {code}
> etc.
> However, when those events are produced by the Kafka Streams app using 2.7.0, 
> we get:
> {code:java}
> CreateTime:0  
> CreateTime:0  
> CreateTime:0   
> {code}
> I don't know if these is a default value somewhere that changed, but this is 
> actually a blocker for our use-cases as we now need to circumnavigate this 
> limitation (or roll back to 2.6.1, though there are other issues we must deal 
> with then). I am not sure which unit tests in the code base to look at to 
> validate this, but I wanted to log this bug now in case someone else has 
> already seen this or an open one exists (I didn't see one though).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12370) Refactor KafkaStreams exposed metadata hierarchy

2021-02-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12370:
-

 Summary: Refactor KafkaStreams exposed metadata hierarchy
 Key: KAFKA-12370
 URL: https://issues.apache.org/jira/browse/KAFKA-12370
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Currently in KafkaStreams we have two groups of metadata getter:

1.
{code}
allMetadata
allMetadataForStore
{code}

Return collection of {{StreamsMetadata}}, which only contains the partitions as 
active/standby, plus the hostInfo, but not exposing any task info.

2.
{code}
queryMetadataForKey
{code}

Returns {{KeyQueryMetadata}} that includes the hostInfos of active and 
standbys, plus the partition id.

3.
{code}
localThreadsMetadata
{code}

Returns {{ThreadMetadata}}, that includes a collection of {{TaskMetadata}} for 
active and standby tasks.

All the above functions are used for interactive queries, but their exposed 
metadata are very different, and some use cases would need to have all client, 
thread, and task metadata to fulfill the feature development. At the same time, 
we may have a more dynamic "task -> thread" mapping in the future and also the 
embedded clients like consumers would not be per thread, but per client.

---

Rethinking about the metadata, I feel we can have a more consistent hierarchy 
as the following:

* {{StreamsMetadata}} represent the metadata for the client, which includes the 
set of {{ThreadMetadata}} for its existing thread and the set of 
{{TaskMetadata}} for active and standby tasks assigned to this client, plus 
client metadata including hostInfo, embedded client ids.

* {{ThreadMetadata}} includes name, state, the set of {{TaskMetadata}} for 
currently assigned tasks.

* {{TaskMetadata}} includes the name (including the sub-topology id and the 
partition id), the state, the corresponding sub-topology description (including 
the state store names, source topic names).

* {{allMetadata}}, {{allMetadataForStore}}, {{allMetadataForKey}} (renamed from 
queryMetadataForKey) returns the set of {{StreamsMetadata}}, and 
{{localMetadata}} (renamed from localThreadMetadata) returns a single 
{{StreamsMetadata}}.

To illustrate as an example, to find out who are the current active host / 
standby hosts of a specific store, we would call {{allMetadataForStore}}, and 
for each returned {{StreamsMetadata}} we loop over their contained 
{{TaskMetadata}} for active / standby, and filter by its corresponding 
sub-topology's description's contained store name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12352) Improve debuggability with continuous consumer rebalances

2021-02-20 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12352:
-

 Summary: Improve debuggability with continuous consumer rebalances
 Key: KAFKA-12352
 URL: https://issues.apache.org/jira/browse/KAFKA-12352
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang


There are several scenarios where a consumer/streams client can fall into 
continuous rebalances and hence does not make any progress. Today when this 
happens, developers usually need to do a lot digging in order to get insights 
on what happens. Here's short summary of different scenarios where we 
(re-)trigger rebalances: 

1. Group member kicked out of the group: when the coordinator kicked out the 
member, later on when the member issues a join / sync / heartbeat / 
offset-commit, it will fail and the member will try to re-join. When the member 
was constantly calling poll too late, it would continuously fall into this 
scenario and not make progress. 

2. Group is rebalancing: if the group is rebalancing at the moment, the 
member's heartbeat / offset commit / sync-group will fail. In this case the 
member rejoining the group is not the root cause of the rebalancing anyways. 

3. Caller enforce a rebalance via `enforceRebalance`. This is one-off and 
should not cause rebalance storms. 

4. After a rebalance is completed, the member found out that a) its 
subscription has changed or 2) its subscribed topics' number of partitions 
changed since the re-join request was sent. In this case it needs to re-trigger 
the rebalance in order to get the new assignment. Since the subscription change 
is one-off, it should not cause rebalance storms; topic metadata change should 
also be infrequent, but there are some rare cases where topic metadata keeps 
"vibrating" due to broker side issues. 

5. After a rebalance is completed, the member need to revoke some partitions as 
indicated by the assignment. After the revocation it needs to re-join the 
group. This may cause rebalance storms when the partition assignor was 
sub-optimal in determining the assignment and hence the partitions keep 
migrating around and rebalances triggered continuously. 

As we can see, 1/5 above could potentially cause rebalance storms, while 2/3/4 
should not in normal cases. In all of these scenarios, we should expose exactly 
the reason why the member is re-joining the group, and whether this re-joining 
the group would trigger the rebalance, or if it is already in a rebalance 
(hence join-group itself is not causing it, but the result of it). This could 
help operators to quickly nail down which of the above may be the root cause of 
continuous rebalances. 

I'd suggest we first go through the log4j hierarchy to make sure this is the 
right place, and maybe in the future we can expose a single state metric on top 
of the logging categorization for even convienent trouble shooting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12242) Decouple state store materialization enforcement from name/serde provider

2021-01-26 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12242:
-

 Summary: Decouple state store materialization enforcement from 
name/serde provider
 Key: KAFKA-12242
 URL: https://issues.apache.org/jira/browse/KAFKA-12242
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Many users of Streams would want the following: let the Streams runtime to 
decide whether or not to materialize a state store; AND if it decides to do so, 
use the store name / serdes I provided ahead of time, if not, then nothing 
happens (the provided store name and serdes can just be dropped).

However, Streams today take `Materialized` as an indicator to enforce the 
materialization. We should think of a way for users to optionally decouple 
materialization enforcement from name/serde provider.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12219) Potential race condition in InMemoryKeyValueStore

2021-01-19 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12219.
---
Fix Version/s: 2.8.0
   Resolution: Fixed

> Potential race condition in InMemoryKeyValueStore
> -
>
> Key: KAFKA-12219
> URL: https://issues.apache.org/jira/browse/KAFKA-12219
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Critical
> Fix For: 2.8.0
>
>
> With KAFKA-8802 (included in [2.4.0 
> release|https://downloads.apache.org/kafka/2.4.0/RELEASE_NOTES.html]), 
> {{ConcurrentSkipListMap}} in {{InMemoryKeyValueStore}} was reverted into 
> {{TreeMap}} for performance issues. However, the {{synchronized}} keyword for 
> {{reverseRange}}, {{reverseAll}} methods were omitted, leaving possibility of 
> race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10830) Kafka Producer API should throw unwrapped exceptions

2020-12-08 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10830:
-

 Summary: Kafka Producer API should throw unwrapped exceptions
 Key: KAFKA-10830
 URL: https://issues.apache.org/jira/browse/KAFKA-10830
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Guozhang Wang


Today in various KafkaProducer APIs (especially send and transaction related) 
we wrap many of the underlying exception with a KafkaException. In some nested 
calls we may even wrap it more than once. Although the initial goal is to not 
expose the root cause directly to users, it also brings confusion to advanced 
user's error handling that some KafkaException wrapped root cause may be 
handled differently.

Since all of those exceptions are public classes anyways (since one can still 
get them via exception.root()) and they are all inheriting KafkaException, I'd 
suggest we do not do any wrapping any more and throw the exception directly. 
For those users who just capture all KafkaException and handle them universally 
it is still compatible; but for those users who want to handle exceptions 
differently it would introduce an easier way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10829) Kafka Streams handle produce exception improvement

2020-12-08 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10829:
-

 Summary: Kafka Streams handle produce exception improvement
 Key: KAFKA-10829
 URL: https://issues.apache.org/jira/browse/KAFKA-10829
 Project: Kafka
  Issue Type: Improvement
  Components: producer , streams
Reporter: Guozhang Wang


A summary of some recent discussions on how we should improve on embedded 
producer exception handling.

Note that below the basline logic would guarantee that our correctness 
semantics is not violated; and optimization are on top of the baseline to 
reduce the user's burden by letting the library auto-handle certain types of 
exception.

1) ``Producer.send()`` throw exception directly: 

1.a) baseline (to make sure correctness) logic is to always wrap them as 
StreamsException, it would cause the thread to shutdown and exception handler 
triggered. The handler could look into the wrapped exception and decide whether 
the shutdown thread can be restarted.

1.b) optimization is to look at the exception, and decide if they can be 
wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would then 
be auto-handled by lost-all-tasks and re-join.

2) ``Producer.send()`` Callback has an exception:

2.a) baseline is first to check if the exception is instanceof 
RetriableException.

If not retriable, pass it to the producer exception handler to decide whether 
to throw or to continue with record dropped. If decide to throw, always warp it 
as StreamsException and keep it locally; at the same time do not send more 
records from the caller. In the next send call, check the remembered exception 
and throw. It would cause the thread to shutdown and exception handler 
triggered.

If the exception is not Retriable, always throw it as a fatal StreamsException.

2.b) optimization one: if the non-retriable exception can be translated as a 
TaskMigratedException, then do not wrap it as StreamsException to let the 
library handle internally.

2.c) optimization two: if the retriable exception is a timeout exception, then 
do not pass to the produce exception handler and treat it as TaskMigrated.

3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly:

3.a) baseline logic is to capture all KafkaException except TimeoutException, 
and handle them as *TaskCorrupted* (which include abort the transaction, reset 
the state, and re-join the group). TimeoutException would be rethrown.

3.b) optimization: some exceptions can be handled as TaskMigrated, which would 
be handled in a lighter way.

4) ``Producer.abortTxn`` throw exception:

3.a) baseline logic is to capture all KafkaException  except TimeoutException 
as fatal StreamsException. TimeoutException would be rethrown.

3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition 
means the abort did not succeeded).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10725) Merge StoreQueryIntegrationTest and QueryableStateIntegrationTest

2020-11-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10725:
-

 Summary: Merge StoreQueryIntegrationTest and 
QueryableStateIntegrationTest
 Key: KAFKA-10725
 URL: https://issues.apache.org/jira/browse/KAFKA-10725
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Guozhang Wang


These two integration tests are covering different issues, and have had their 
own flakiness in the past. I think it's better to merge them into a single 
class (and some of them can better be reduced to unit tests?) so that if 
there's still flakiness, we only need to fix it in one place.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   9   10   >