[jira] [Resolved] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling
[ https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7728. Resolution: Won't Fix This is no longer a critical fix to the static membership, as leader rejoin with no member info won't trigger rebalance in the original implementation. > Add JoinReason to the join group request for better rebalance handling > -- > > Key: KAFKA-7728 > URL: https://issues.apache.org/jira/browse/KAFKA-7728 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Mayuresh Gharat >Priority: Major > Labels: consumer, mirror-maker, needs-kip > > Recently [~mgharat] and I discussed about the current rebalance logic on > leader join group request handling. So far we blindly trigger rebalance when > the leader rejoins. The caveat is that KIP-345 is not covering this effort > and if a consumer group is not using sticky assignment but using other > strategy like round robin, the redundant rebalance could still shuffle the > topic partitions around consumers. (for example mirror maker application) > I checked on broker side and here is what we currently do: > > {code:java} > if (group.isLeader(memberId) || !member.matches(protocols)) > // force a rebalance if a member has changed metadata or if the leader sends > JoinGroup. > // The latter allows the leader to trigger rebalances for changes affecting > assignment > // which do not affect the member metadata (such as topic metadata changes > for the consumer) {code} > Based on the broker logic, we only need to trigger rebalance for leader > rejoin when the topic metadata change has happened. I also looked up the > ConsumerCoordinator code on client side, and found out the metadata > monitoring logic here: > {code:java} > public boolean rejoinNeededOrPending() { > ... > // we need to rejoin if we performed the assignment and metadata has changed > if (assignmentSnapshot != null && > !assignmentSnapshot.equals(metadataSnapshot)) > return true; > }{code} > I guess instead of just returning true, we could introduce a new enum field > called JoinReason which could indicate the purpose of the rejoin. Thus we > don't need to do a full rebalance when the leader is just in rolling bounce. > We could utilize this information I guess. Just add another enum field into > the join group request called JoinReason so that we know whether leader is > rejoining due to topic metadata change. If yes, we trigger rebalance > obviously; if no, we shouldn't trigger rebalance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12499) Adjust transaction timeout according to commit interval on Streams EOS
Boyang Chen created KAFKA-12499: --- Summary: Adjust transaction timeout according to commit interval on Streams EOS Key: KAFKA-12499 URL: https://issues.apache.org/jira/browse/KAFKA-12499 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 3.0.0 The transaction timeout is set to 1 minute by default on Producer today, while the commit interval on the other hand could be set to a very large value, which makes the stream always hit transaction timeout and drop into rebalance. We should increase the transaction timeout correspondingly when commit interval is large. On the other hand, broker could have a limit on the max transaction timeout to be set. If we scale up client transaction timeout over the limit, stream will fail due to INVALID_TRANSACTION_TIMEOUT. To alleviate this problem, user could define their own customized transaction timeout to avoid hitting the limit, so we should still respect what user configures in the override. The new rule for configuring transaction timeout should look like: 1. If transaction timeout is set in streams config, use it 2. if not, transaction_timeout = max(default_transaction_timeout, 10 * commit_interval) Additionally if INVALID_TRANSACTION_TIMEOUT was thrown on Streams when calling initTransaction(), we should wrap the exception to inform user that their setting for commit interval could potentially be too high and should adjust. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8
[ https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-12381. - Resolution: Fixed > Incompatible change in verifiable_producer.log in 2.8 > - > > Key: KAFKA-12381 > URL: https://issues.apache.org/jira/browse/KAFKA-12381 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Colin McCabe >Assignee: Boyang Chen >Priority: Blocker > Labels: kip-500 > > In test_verifiable_producer.py , we used to see this error message in > verifiable_producer.log when a topic couldn't be created: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} > (org.apache.kafka.clients.NetworkClient) > The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it > used to pass. > Now we are instead seeing this in the log file: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} > (org.apache.kafka.clients.NetworkClient) > And of course now the test fails. > The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation > manager. > It is a simple matter to make the test pass -- I have confirmed that it > passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of > LEADER_NOT_AVAILABLE. > I think we just need to decide if this change in behavior is acceptable or > not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12307) Wrap non-fatal exception as CommitFailedException when thrown from commitTxn
Boyang Chen created KAFKA-12307: --- Summary: Wrap non-fatal exception as CommitFailedException when thrown from commitTxn Key: KAFKA-12307 URL: https://issues.apache.org/jira/browse/KAFKA-12307 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen As stated in the KIP, we need to ensure the non-fatal exception get wrapped inside CommitFailed to be properly handled by the new template. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12304) Improve topic validation in auto topic creation
Boyang Chen created KAFKA-12304: --- Summary: Improve topic validation in auto topic creation Key: KAFKA-12304 URL: https://issues.apache.org/jira/browse/KAFKA-12304 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen The topic validation path should have a higher priority than other follow-up processes. Basically we should move it right after the topic authorization is done in metadata request handling. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation
Boyang Chen created KAFKA-12294: --- Summary: Consider using the forwarding mechanism for metadata auto topic creation Key: KAFKA-12294 URL: https://issues.apache.org/jira/browse/KAFKA-12294 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to improve the topic creation auditing by forwarding the CreateTopicsRequest inside Envelope for the given client. Details in [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12260) PartitionsFor should not return null value
Boyang Chen created KAFKA-12260: --- Summary: PartitionsFor should not return null value Key: KAFKA-12260 URL: https://issues.apache.org/jira/browse/KAFKA-12260 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Boyang Chen consumer.partitionsFor() could return null value when topic was not found. This was not properly documented and was error-prone when the return type was a list. We should fix the logic internally to prevent partitionsFor returning null result. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12215) Broker could cache its overlapping ApiVersions with active controller
Boyang Chen created KAFKA-12215: --- Summary: Broker could cache its overlapping ApiVersions with active controller Key: KAFKA-12215 URL: https://issues.apache.org/jira/browse/KAFKA-12215 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Right now, each ApiVersionRequest would need to compute overlapping api versions with controller every time: https://issues.apache.org/jira/browse/KAFKA-10674 Unless the active controller is changed, the computation result should always be the same. We may consider cache the result and only update it when the controller change happens. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10868) Avoid double wrapping KafkaException
Boyang Chen created KAFKA-10868: --- Summary: Avoid double wrapping KafkaException Key: KAFKA-10868 URL: https://issues.apache.org/jira/browse/KAFKA-10868 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9552) Stream should handle OutOfSequence exception thrown from Producer
[ https://issues.apache.org/jira/browse/KAFKA-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9552. Resolution: Not A Problem > Stream should handle OutOfSequence exception thrown from Producer > - > > Key: KAFKA-9552 > URL: https://issues.apache.org/jira/browse/KAFKA-9552 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Boyang Chen >Priority: Major > > As of today the stream thread could die from OutOfSequence error: > {code:java} > [2020-02-12T07:14:35-08:00] > (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > [2020-02-12T07:14:35-08:00] > (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 > 15:14:35,185] ERROR > [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] > stream-thread > [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed > to commit stream task 3_2 due to the following error: > (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) > [2020-02-12T07:14:35-08:00] > (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) > org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending > since an error caught with a previous record (timestamp 1581484094825) to > topic stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due > to org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353) > {code} > Although this is fatal exception for Producer, stream should treat it as an > opportunity to reinitialize by doing a rebalance, instead of killing > computation resource. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10813) StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in all cases
[ https://issues.apache.org/jira/browse/KAFKA-10813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10813. - Resolution: Fixed > StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in > all cases > - > > Key: KAFKA-10813 > URL: https://issues.apache.org/jira/browse/KAFKA-10813 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Fix For: 2.7.0 > > > We fixed the error code handling on producer in > https://issues.apache.org/jira/browse/KAFKA-10687, however the newly thrown > `InvalidProducerEpoch` exception was not properly handled on Streams side in > all cases. We should catch it and rethrow as TaskMigrated to trigger > exception, similar to ProducerFenced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10813) StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in all cases
Boyang Chen created KAFKA-10813: --- Summary: StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in all cases Key: KAFKA-10813 URL: https://issues.apache.org/jira/browse/KAFKA-10813 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Fix For: 2.7.0 We fixed the error code handling on producer in https://issues.apache.org/jira/browse/KAFKA-10687, however the newly thrown `InvalidProducerEpoch` exception was not properly handled on Streams side in all cases. We should catch it and rethrow as TaskMigrated to trigger exception, similar to ProducerFenced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10807) AlterConfig should be validated by the target broker
Boyang Chen created KAFKA-10807: --- Summary: AlterConfig should be validated by the target broker Key: KAFKA-10807 URL: https://issues.apache.org/jira/browse/KAFKA-10807 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen After forwarding is enabled, AlterConfigs will no longer be sent to the target broker. This behavior bypasses some important config change validations, such as path existence, static config conflict, or even worse when the target broker is offline, the propagated result does not reflect a true applied result. We should gather those necessary cases, and decide whether to actually handle the AlterConfig request firstly on the target broker, and then forward, in a validate-forward-apply path. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs
Boyang Chen created KAFKA-10733: --- Summary: Enforce exception thrown for KafkaProducer txn APIs Key: KAFKA-10733 URL: https://issues.apache.org/jira/browse/KAFKA-10733 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen In general, KafkaProducer could throw both fatal and non-fatal errors as KafkaException, which makes the exception catching hard. Furthermore, not every single fatal exception (checked) is marked on the function signature yet as of 2.7. We should have a general supporting strategy in long term for this matter, as whether to declare all non-fatal exceptions as wrapped KafkaException while extracting all fatal ones, or just add a flag to KafkaException indicating fatal or not, to maintain binary compatibility. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10714) Save unnecessary end txn call when the transaction is confirmed to be done
Boyang Chen created KAFKA-10714: --- Summary: Save unnecessary end txn call when the transaction is confirmed to be done Key: KAFKA-10714 URL: https://issues.apache.org/jira/browse/KAFKA-10714 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In certain error cases after KIP-588, we may skip the call of `EndTxn` to the txn coordinator such as for the transaction timeout case, where we know the transaction is already terminated on the broker side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10699) Add system test coverage for group coordinator emigration
Boyang Chen created KAFKA-10699: --- Summary: Add system test coverage for group coordinator emigration Key: KAFKA-10699 URL: https://issues.apache.org/jira/browse/KAFKA-10699 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen After merging the fix https://issues.apache.org/jira/browse/KAFKA-10284, we believe that it is important to add system test coverage for the group coordinator migration to verify consumer behaviors are correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED
Boyang Chen created KAFKA-10687: --- Summary: Produce request should be bumped for new error code PRODUCE_FENCED Key: KAFKA-10687 URL: https://issues.apache.org/jira/browse/KAFKA-10687 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 2.7.0 In https://issues.apache.org/jira/browse/KAFKA-9910, we missed a case where the ProduceRequest needs to be bumped to return the new error code PRODUCE_FENCED. This gap needs to be addressed as a blocker since it is shipping in 2.7. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10181) Create Envelope RPC and redirection template for configuration change RPCs
[ https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10181. - Resolution: Fixed > Create Envelope RPC and redirection template for configuration change RPCs > -- > > Key: KAFKA-10181 > URL: https://issues.apache.org/jira/browse/KAFKA-10181 > Project: Kafka > Issue Type: Sub-task > Components: admin >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.8.0 > > > In the bridge release broker, > AlterConfig/IncrementalAlterConfig/CreateTopics/AlterClientQuota should be > redirected to the active controller. This ticket will ensure those RPCs get > redirected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10343) Add IBP based ApiVersion constraint tests
[ https://issues.apache.org/jira/browse/KAFKA-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10343. - Resolution: Won't Fix We are planning to work on a more long-term fix for this issue, see > Add IBP based ApiVersion constraint tests > - > > Key: KAFKA-10343 > URL: https://issues.apache.org/jira/browse/KAFKA-10343 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.8.0 > > > We need to add ApiVersion constraints test based on IBP to remind future > developer bump it when new RPC version is developed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10674) Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding
Boyang Chen created KAFKA-10674: --- Summary: Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding Key: KAFKA-10674 URL: https://issues.apache.org/jira/browse/KAFKA-10674 Project: Kafka Issue Type: Improvement Components: clients, core Reporter: Boyang Chen Admin clients send ApiVersions to the broker upon the first connection establishes. The tricky thing after forwarding is enabled is that for forwardable APIs, admin client needs to know a commonly-agreed rang of ApiVersions among handling broker, active controller and itself. Right now the inter-broker APIs are guaranteed by IBP constraints, but not for forwardable APIs. A compromised solution would be to put all forwardable APIs under IBP, which is brittle and hard to maintain consistency. Instead, any broker connecting to the active controller should send an ApiVersion request from beginning, so it is easy to compute that information and send back to the admin clients upon ApiVersion request from admin. Any rolling of the active controller will trigger reconnection between broker and controller, which guarantees a refreshed ApiVersions between the two. This approach avoids the tight bond with IBP and broker could just close the connection between admin client to trigger retry logic and refreshing of the ApiVersions. Since this failure should be rare, two round-trips and timeout delays are well compensated by the less engineering work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10668) Avoid deserialization on second hop for request forwarding
Boyang Chen created KAFKA-10668: --- Summary: Avoid deserialization on second hop for request forwarding Key: KAFKA-10668 URL: https://issues.apache.org/jira/browse/KAFKA-10668 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Right now on forwarding broker we would deserialize the response and serialize it again to respond to the client. It should be able to keep the response data sealed and send back to the client to save some CPU cost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10667) Add timeout for forwarding requests
Boyang Chen created KAFKA-10667: --- Summary: Add timeout for forwarding requests Key: KAFKA-10667 URL: https://issues.apache.org/jira/browse/KAFKA-10667 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen It makes sense to handle timeout for forwarding request coming from the client, instead of retry indefinitely. We could either use the api timeout, or a customized timeout hook which could be defined by different request types. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10663) Flakey test ConsumerBounceTest#testSeekAndCommitWithBrokerFailures
Boyang Chen created KAFKA-10663: --- Summary: Flakey test ConsumerBounceTest#testSeekAndCommitWithBrokerFailures Key: KAFKA-10663 URL: https://issues.apache.org/jira/browse/KAFKA-10663 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.7.0 Reporter: Boyang Chen org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:40823: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:671) at kafka.network.Acceptor.(SocketServer.scala:539) at kafka.network.SocketServer.createAcceptor(SocketServer.scala:280) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:253) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.utils.TestUtils$.createServer(TestUtils.scala:160) at kafka.utils.TestUtils$.createServer(TestUtils.scala:151) at kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:102) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scal >From AK 2.7 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema
Boyang Chen created KAFKA-10657: --- Summary: Incorporate Envelope into auto-generated JSON schema Key: KAFKA-10657 URL: https://issues.apache.org/jira/browse/KAFKA-10657 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen We need to add support to output JSON format for embed request inside Envelope to do better request loggin. https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9999) Internal topic creation failure should be non-fatal and trigger explicit rebalance
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-. Resolution: Won't Fix > Internal topic creation failure should be non-fatal and trigger explicit > rebalance > --- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Bug > Components: admin, streams >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > We spotted a case in system test failure where the topic already exists but > the admin client still attempts to recreate it: > > {code:java} > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably > marked for deletion (number of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-uwin-cnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-cntByCnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics > [SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, > SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made > ready with 5 retries left > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,221] ERROR stream-thread > [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered > the following unexpected Kafka exception during processing, this usually > indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.
[jira] [Created] (KAFKA-10607) Ensure the error counts contains the NONE
Boyang Chen created KAFKA-10607: --- Summary: Ensure the error counts contains the NONE Key: KAFKA-10607 URL: https://issues.apache.org/jira/browse/KAFKA-10607 Project: Kafka Issue Type: Improvement Components: clients Reporter: Boyang Chen In the RPC errorCounts() call, there are inconsistent behaviors from the default implementation, for example certain RPCs filter out Errors.NONE during the map generation. We should make it consistent by applying the errorCounts() to Errors.NONE for all RPCs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10594) Enhance Raft exception handling
Boyang Chen created KAFKA-10594: --- Summary: Enhance Raft exception handling Key: KAFKA-10594 URL: https://issues.apache.org/jira/browse/KAFKA-10594 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen The current exception handling on the Raft implementation is superficial, for example we don't treat file system exception and request handling exception differently. It's necessary to decide what kind of exception should be fatal, what kind of exception should be responding to the client, and what exception could be retried. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10508) Consider moving ForwardRequestHandler to a separate class
Boyang Chen created KAFKA-10508: --- Summary: Consider moving ForwardRequestHandler to a separate class Key: KAFKA-10508 URL: https://issues.apache.org/jira/browse/KAFKA-10508 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen With the new redirection template merged in [https://github.com/apache/kafka/pull/9103,] the size of KafkaApis file grows to 3500+, which is reaching a fair large size. We should consider moving the redirection template out as a separate file to reduce the main class size. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10400) Add a customized Kafka Streams logo
Boyang Chen created KAFKA-10400: --- Summary: Add a customized Kafka Streams logo Key: KAFKA-10400 URL: https://issues.apache.org/jira/browse/KAFKA-10400 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response
[ https://issues.apache.org/jira/browse/KAFKA-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10042. - Resolution: Fixed > Make INVALID_PRODUCER_EPOCH abortable from Produce response > --- > > Key: KAFKA-10042 > URL: https://issues.apache.org/jira/browse/KAFKA-10042 > Project: Kafka > Issue Type: Sub-task > Components: producer >Reporter: Boyang Chen >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9911) Implement new producer fenced error
[ https://issues.apache.org/jira/browse/KAFKA-9911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9911. Resolution: Fixed > Implement new producer fenced error > --- > > Key: KAFKA-9911 > URL: https://issues.apache.org/jira/browse/KAFKA-9911 > Project: Kafka > Issue Type: Sub-task > Components: producer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10345) Redirect AlterPartitionReassignment to the controller
Boyang Chen created KAFKA-10345: --- Summary: Redirect AlterPartitionReassignment to the controller Key: KAFKA-10345 URL: https://issues.apache.org/jira/browse/KAFKA-10345 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the bridge release broker, the AlterPartitionReassignment should be redirected to the active controller instead of relying on admin client discovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10344) Redirect Create/Renew/ExpireDelegationToken to the controller
Boyang Chen created KAFKA-10344: --- Summary: Redirect Create/Renew/ExpireDelegationToken to the controller Key: KAFKA-10344 URL: https://issues.apache.org/jira/browse/KAFKA-10344 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the bridge release broker, Create/Renew/ExpireDelegationToken should be redirected to the active controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10343) Redirect AlterClientQuotas to the controller
Boyang Chen created KAFKA-10343: --- Summary: Redirect AlterClientQuotas to the controller Key: KAFKA-10343 URL: https://issues.apache.org/jira/browse/KAFKA-10343 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the new Admin client, the AlterClientQuotas request should be redirected to the active controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10342) Redirect CreateAcls/DeleteAcls to the controller
Boyang Chen created KAFKA-10342: --- Summary: Redirect CreateAcls/DeleteAcls to the controller Key: KAFKA-10342 URL: https://issues.apache.org/jira/browse/KAFKA-10342 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the new Admin client, the CreateAcls/DeleteAcls request should be redirected to the active controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10350) Add redirect request monitoring metrics
Boyang Chen created KAFKA-10350: --- Summary: Add redirect request monitoring metrics Key: KAFKA-10350 URL: https://issues.apache.org/jira/browse/KAFKA-10350 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen We need to add the metric for monitoring redirection progress as stated in the KIP: MBean:kafka.server:type=RequestMetrics,name=NumRequestsForwardingToControllerPerSec,clientId=([-.\w]+) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10349) Deprecate client side controller access
Boyang Chen created KAFKA-10349: --- Summary: Deprecate client side controller access Key: KAFKA-10349 URL: https://issues.apache.org/jira/browse/KAFKA-10349 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen As stated in KIP-590, we would disallow new admin client to discover the controller location for encapsulation. For older broker communication, the metadata response will still contain the controller location. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10348) Redirect UpdateFeatures to the controller
Boyang Chen created KAFKA-10348: --- Summary: Redirect UpdateFeatures to the controller Key: KAFKA-10348 URL: https://issues.apache.org/jira/browse/KAFKA-10348 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the bridge release broker, the UpdateFeatures should be redirected to the active controller instead of relying on admin client discovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10347) Redirect Create/DeleteTopics to the controller
Boyang Chen created KAFKA-10347: --- Summary: Redirect Create/DeleteTopics to the controller Key: KAFKA-10347 URL: https://issues.apache.org/jira/browse/KAFKA-10347 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the bridge release broker, the Create/DeleteTopics should be redirected to the active controller instead of relying on admin client discovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10346) Redirect CreatePartition to the controller
Boyang Chen created KAFKA-10346: --- Summary: Redirect CreatePartition to the controller Key: KAFKA-10346 URL: https://issues.apache.org/jira/browse/KAFKA-10346 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In the bridge release broker, the CreatePartition should be redirected to the active controller instead of relying on admin client discovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig
[ https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10270. - Resolution: Fixed > Add a broker to controller channel manager to redirect AlterConfig > -- > > Key: KAFKA-10270 > URL: https://issues.apache.org/jira/browse/KAFKA-10270 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Per KIP-590 requirement, we need to have a dedicate communication channel > from broker to the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10321) shouldDieOnInvalidOffsetExceptionWhileRunning would block forever on JDK11
Boyang Chen created KAFKA-10321: --- Summary: shouldDieOnInvalidOffsetExceptionWhileRunning would block forever on JDK11 Key: KAFKA-10321 URL: https://issues.apache.org/jira/browse/KAFKA-10321 Project: Kafka Issue Type: Bug Affects Versions: 2.7.0 Reporter: Boyang Chen Have spotted two definite cases where the test shouldDieOnInvalidOffsetExceptionWhileRunning fails to stop during the whole test suite: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7604/consoleFull] [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10311) Flaky test KafkaAdminClientTest#testMetadataRetries
Boyang Chen created KAFKA-10311: --- Summary: Flaky test KafkaAdminClientTest#testMetadataRetries Key: KAFKA-10311 URL: https://issues.apache.org/jira/browse/KAFKA-10311 Project: Kafka Issue Type: Bug Reporter: Boyang Chen [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3545/testReport/junit/org.apache.kafka.clients.admin/KafkaAdminClientTest/testMetadataRetries/] h3. Error Message java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1595694629113, tries=1, nextAllowedTryMs=1595694629217) timed out at 1595694629117 after 1 attempt(s) h3. Stacktrace java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1595694629113, tries=1, nextAllowedTryMs=1595694629217) timed out at 1595694629117 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:995) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1595694629113, tries=1, nextAllowedTryMs=1595694629217) timed out at 1595694629117 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
Boyang Chen created KAFKA-10307: --- Summary: Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable Key: KAFKA-10307 URL: https://issues.apache.org/jira/browse/KAFKA-10307 Project: Kafka Issue Type: Bug Reporter: Boyang Chen We have spotted a cycled topology for the foreign-key join test *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug in the algorithm or the test only. Used [https://zz85.github.io/kafka-streams-viz/] to visualize: {code:java} Sub-topology: 0 Source: KTABLE-SOURCE-19 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 Source: KTABLE-SOURCE-32 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 Source: KSTREAM-SOURCE-01 (topics: [table1]) --> KTABLE-SOURCE-02 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: [table1-STATE-STORE-00]) --> KTABLE-FK-JOIN-OUTPUT-21 <-- KTABLE-SOURCE-19 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: [INNER-store1]) --> KTABLE-FK-JOIN-OUTPUT-34 <-- KTABLE-SOURCE-32 Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) --> KTABLE-TOSTREAM-35 <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 Processor: KTABLE-SOURCE-02 (stores: [table1-STATE-STORE-00]) --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 <-- KSTREAM-SOURCE-01 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: []) --> KTABLE-SINK-11 <-- KTABLE-SOURCE-02 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: []) --> KTABLE-SINK-24 <-- KTABLE-FK-JOIN-OUTPUT-21 Processor: KTABLE-TOSTREAM-35 (stores: []) --> KSTREAM-SINK-36 <-- KTABLE-FK-JOIN-OUTPUT-34 Sink: KSTREAM-SINK-36 (topic: output-) <-- KTABLE-TOSTREAM-35 Sink: KTABLE-SINK-11 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 Sink: KTABLE-SINK-24 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 Source: KSTREAM-SOURCE-04 (topics: [table2]) --> KTABLE-SOURCE-05 Source: KTABLE-SOURCE-12 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 <-- KTABLE-SOURCE-12 Processor: KTABLE-SOURCE-05 (stores: [table2-STATE-STORE-03]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 <-- KSTREAM-SOURCE-04 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: [table2-STATE-STORE-03]) --> KTABLE-SINK-18 <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) --> KTABLE-SINK-18 <-- KTABLE-SOURCE-05 Sink: KTABLE-SINK-18 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 Source: KSTREAM-SOURCE-07 (topics: [table3]) --> KTABLE-SOURCE-08 Source: KTABLE-SOURCE-25 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-26]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-28 <-- KTABLE-SOURCE-25 Processor: KTABLE-SOURCE-08 (stores: [table3-STATE-STORE-06]) --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-29 <-- KSTREAM-SOURCE-07 Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-28 (stores: [table3-STATE-STORE-0
[jira] [Created] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
Boyang Chen created KAFKA-10284: --- Summary: Group membership update due to static member rejoin should be persisted Key: KAFKA-10284 URL: https://issues.apache.org/jira/browse/KAFKA-10284 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.6.0 Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 2.6.1 For known static members rejoin, we would update its corresponding member.id without triggering a new rebalance. This serves the purpose for avoiding unnecessary rebalance for static membership, as well as fencing purpose if some still uses the old member.id. The bug is that we don't actually persist the membership update, so if no upcoming rebalance gets triggered, this new member.id information will get lost during group coordinator immigration, thus bringing up the zombie member identity. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig
Boyang Chen created KAFKA-10270: --- Summary: Add a broker to controller channel manager to redirect AlterConfig Key: KAFKA-10270 URL: https://issues.apache.org/jira/browse/KAFKA-10270 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10239) The groupInstanceId field in DescribeGroup response should be ignorable
[ https://issues.apache.org/jira/browse/KAFKA-10239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10239. - Resolution: Fixed > The groupInstanceId field in DescribeGroup response should be ignorable > --- > > Key: KAFKA-10239 > URL: https://issues.apache.org/jira/browse/KAFKA-10239 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Critical > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > We noticed the following error in the logs in the handling of a DescribeGroup > request: > ``` > org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to > write a non-default groupInstanceId at version 3 > ``` > The problem is that the field is not marked as ignorable. So if the user is > relying on static membership and uses an older AdminClient, they will see > this error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10241) Pursue a better way to cover ignorable RPC fields
Boyang Chen created KAFKA-10241: --- Summary: Pursue a better way to cover ignorable RPC fields Key: KAFKA-10241 URL: https://issues.apache.org/jira/browse/KAFKA-10241 Project: Kafka Issue Type: Improvement Components: clients, core Reporter: Boyang Chen We have hit case such as https://issues.apache.org/jira/browse/KAFKA-10239 where we accidentally include a non-ignorable field into the returned response, and eventually crash older clients who doesn't support this field. It would be good to add a generic test suite to cover all existing and new RPC changes to ensure that we don't have a chance to put a non-ignorable field for older version of clients. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10237) Properly handle in-memory stores OOM
Boyang Chen created KAFKA-10237: --- Summary: Properly handle in-memory stores OOM Key: KAFKA-10237 URL: https://issues.apache.org/jira/browse/KAFKA-10237 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen We have seen the in-memory store buffered too much data and eventually get OOM. Generally speaking, OOM has no real indication of the underlying problem and increases the difficulty for user debugging, since the failed thread may not be the actual culprit which causes the explosion. If we could get better protection to avoid hitting memory limit, or at least giving out a clear guide, the end user debugging would be much simpler. To make it work, we need to enforce a certain memory limit below heap size and take actions when hitting it. The first question would be, whether we set a numeric limit, such as 100MB or 500MB, or a percentile limit, such as 60% or 80% of total memory. The second question is about the action itself. One approach would be crashing the store immediately and inform the user to increase their application capacity. The second approach would be opening up an on-disk store spontaneously and offload the data to it. Personally I'm in favor of approach #2 because it has minimum impact to the on-going application. However it is more complex and potentially requires significant works to define the proper behavior such as the default store configuration, how to manage its lifecycle, etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10135) Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager
[ https://issues.apache.org/jira/browse/KAFKA-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10135. - Resolution: Fixed > Extract Task#executeAndMaybeSwallow to be a general utility function into > TaskManager > - > > Key: KAFKA-10135 > URL: https://issues.apache.org/jira/browse/KAFKA-10135 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > We have a couple of cases where we need to swallow the exception during > operations in both Task class and TaskManager class. This utility method > should be generalized at least onto TaskManager level. See discussion comment > [here|https://github.com/apache/kafka/pull/8833#discussion_r437697665]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop
Boyang Chen created KAFKA-10192: --- Summary: Flaky test BlockingConnectorTest#testBlockInConnectorStop Key: KAFKA-10192 URL: https://issues.apache.org/jira/browse/KAFKA-10192 Project: Kafka Issue Type: Bug Reporter: Boyang Chen h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/] h3. Error Message org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not execute PUT request. Error response: \{"error_code":500,"message":"Request timed out"} h3. Stacktrace org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not execute PUT request. Error response: \{"error_code":500,"message":"Request timed out"} at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300) at org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185) at org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch
[jira] [Created] (KAFKA-10181) AlterConfig/IncrementalAlterConfig should go to controller
Boyang Chen created KAFKA-10181: --- Summary: AlterConfig/IncrementalAlterConfig should go to controller Key: KAFKA-10181 URL: https://issues.apache.org/jira/browse/KAFKA-10181 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen In the new Admin client, the request should always be routed towards the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10087) Properly throw LogTruncation exception from OffsetForLeaderEpoch future
[ https://issues.apache.org/jira/browse/KAFKA-10087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10087. - Resolution: Fixed > Properly throw LogTruncation exception from OffsetForLeaderEpoch future > --- > > Key: KAFKA-10087 > URL: https://issues.apache.org/jira/browse/KAFKA-10087 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > For OffsetForLeaderEpoch#onSuccess, we could throw either OffsetOutOfRange or > LogTruncation exceptions, which are swallowed by the AsyncClient logic: > > {code:java} > try { > future.complete(handleResponse(node, requestData, resp)); > } catch (RuntimeException e) { > if (!future.isDone()) { > future.raise(e); >} > } > {code} > We should fix the exception case to throw it to the upstream. In the > meantime, we should ensure that any discard exception case gets retried > eventually for LeaderOffset call. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10139) Add operational guide for failure recovery
Boyang Chen created KAFKA-10139: --- Summary: Add operational guide for failure recovery Key: KAFKA-10139 URL: https://issues.apache.org/jira/browse/KAFKA-10139 Project: Kafka Issue Type: Sub-task Components: core Reporter: Boyang Chen In the first released version, we should include an operation manual to the feature versioning failure cases, such as: 1. broker crash due to violation of feature versioning 2. ZK data corruption (rare) We need to ensure this work gets reflected in the AK documentation after the implementation and thorough testings are done. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10135) Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager
Boyang Chen created KAFKA-10135: --- Summary: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager Key: KAFKA-10135 URL: https://issues.apache.org/jira/browse/KAFKA-10135 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen We have a couple of cases where we need to swallow the exception during operations in both Task class and TaskManager class. This utility method should be generalized at least onto TaskManager level. See discussion comment [here|https://github.com/apache/kafka/pull/8833#discussion_r437697665]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10130) Rewrite ZkData struct with auto-generated protocol
Boyang Chen created KAFKA-10130: --- Summary: Rewrite ZkData struct with auto-generated protocol Key: KAFKA-10130 URL: https://issues.apache.org/jira/browse/KAFKA-10130 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen The ZkData.scala includes a couple of data structs with versions, such as BrokerIdZNode. Human effort to evolve a JSON struct is error-prone, compared with our RPC automated framework. The benefit of this re-write outweighs the trouble IMHO. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10097) Avoid passing task checkpoint file externally
[ https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10097. - Resolution: Fixed > Avoid passing task checkpoint file externally > - > > Key: KAFKA-10097 > URL: https://issues.apache.org/jira/browse/KAFKA-10097 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In StreamTask, we have the logic to generate a checkpoint offset map to be > materialized through StateManager#checkpoint. This map could be either empty > map or null, which the former case indicates to only pull down existing state > store checkpoint data, while the latter indicates no need to do a checkpoint > in the case such as we are suspending a task. > Having two similar special logics for checkpointing could lead to unexpected > bugs, also we should think about separating the empty checkpoint case vs > passed-in checkpoint case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9576) Topic creation failure causing Stream thread death
[ https://issues.apache.org/jira/browse/KAFKA-9576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9576. Resolution: Duplicate > Topic creation failure causing Stream thread death > -- > > Key: KAFKA-9576 > URL: https://issues.apache.org/jira/browse/KAFKA-9576 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > > The failure to create an internal topic could lead to the stream thread death > due to timeout: > {code:java} > [2020-02-14T03:03:00-08:00] > (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 > 11:03:00,083] ERROR > [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] > stream-thread [main] Unexpected error during topic description for > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog. > [2020-02-14T03:03:00-08:00] > (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 > 11:03:00,083] ERROR > [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] > stream-thread > [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-02-14T03:03:00-08:00] > (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) > org.apache.kafka.streams.errors.StreamsException: Could not create topic > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:209) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:223) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:106) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.p
[jira] [Created] (KAFKA-10097) Avoid getting null map for task checkpoint
Boyang Chen created KAFKA-10097: --- Summary: Avoid getting null map for task checkpoint Key: KAFKA-10097 URL: https://issues.apache.org/jira/browse/KAFKA-10097 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen In StreamTask, we have the logic to generate a checkpoint offset map to be materialized through StateManager#checkpoint. This map could be either empty map or null, which the former case indicates to only pull down existing state store checkpoint data, while the latter indicates no need to do a checkpoint in the case such as we are suspending a task. Having two similar special logics for checkpointing could lead to unexpected bugs, also we should think about separating the empty checkpoint case vs passed-in checkpoint case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10087) Properly throw LogTruncation exception from OffsetForLeaderEpoch future
Boyang Chen created KAFKA-10087: --- Summary: Properly throw LogTruncation exception from OffsetForLeaderEpoch future Key: KAFKA-10087 URL: https://issues.apache.org/jira/browse/KAFKA-10087 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen For OffsetForLeaderEpoch#onSuccess, we could throw either OffsetOutOfRange or LogTruncation exceptions, which are swallowed by the AsyncClient logic: try { future.complete(handleResponse(node, requestData, resp)); } catch (RuntimeException e) { if (!future.isDone()) { future.raise(e); } } We should fix the exception case to throw it to the upstream. In the meantime, we should ensure that any discard exception case gets retried eventually for LeaderOffset call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10011) lockedTaskDirectories should be cleared when task gets closed dirty in HandleLostAll
[ https://issues.apache.org/jira/browse/KAFKA-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10011. - Resolution: Fixed > lockedTaskDirectories should be cleared when task gets closed dirty in > HandleLostAll > > > Key: KAFKA-10011 > URL: https://issues.apache.org/jira/browse/KAFKA-10011 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Tasks who get closed in handleLostAll don't clear out their position inside > lockedTaskDirectories, which causes an illegal state afterwards: > {code:java} > [2020-05-17T06:21:54-07:00] > (streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) [2020-05-17 > 13:21:54,127] ERROR > [stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] > stream-thread > [stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-17T06:21:54-07:00] > (streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) > org.apache.kafka.streams.errors.ProcessorStateException: task directory > [/mnt/run/streams/state/stream-soak-test/3_1] doesn't exist and couldn't be > created > at > org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112) > at > org.apache.kafka.streams.processor.internals.StateDirectory.checkpointFileFor(StateDirectory.java:121) > at > org.apache.kafka.streams.processor.internals.TaskManager.getTaskOffsetSums(TaskManager.java:498) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscriptionUserData(StreamsPartitionAssignor.java:239) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:222) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:560) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:495) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:417) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1265) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:770) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:630) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10010) Should make state store registration idempotent
[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10010. - Resolution: Fixed > Should make state store registration idempotent > --- > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10064) Add documentation for KIP-571
Boyang Chen created KAFKA-10064: --- Summary: Add documentation for KIP-571 Key: KAFKA-10064 URL: https://issues.apache.org/jira/browse/KAFKA-10064 Project: Kafka Issue Type: Task Components: docs, streams Reporter: Boyang Chen Assignee: feyman Fix For: 2.6.0 We need to add documentation of KIP-571 similar to what other KIPs going out in 2.6: [https://github.com/apache/kafka/pull/8621] [~feyman] I'm assigning this to you for now, let me know if there is anything missing for context. Here is the instruction to setup Apache wiki on local: [https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response
Boyang Chen created KAFKA-10042: --- Summary: Make INVALID_PRODUCER_EPOCH abortable from Produce response Key: KAFKA-10042 URL: https://issues.apache.org/jira/browse/KAFKA-10042 Project: Kafka Issue Type: Sub-task Components: producer Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10011) lockedTaskDirectories should be cleared when task gets closed dirty in HandleLostAll
Boyang Chen created KAFKA-10011: --- Summary: lockedTaskDirectories should be cleared when task gets closed dirty in HandleLostAll Key: KAFKA-10011 URL: https://issues.apache.org/jira/browse/KAFKA-10011 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen Tasks who get closed in handleLostAll don't clear out their position inside lockedTaskDirectories, which causes an illegal state afterwards: {code:java} [2020-05-17T06:21:54-07:00] (streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) [2020-05-17 13:21:54,127] ERROR [stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] stream-thread [stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] Encountered the following exception during processing and the thread is going to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) [2020-05-17T06:21:54-07:00] (streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) org.apache.kafka.streams.errors.ProcessorStateException: task directory [/mnt/run/streams/state/stream-soak-test/3_1] doesn't exist and couldn't be created at org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112) at org.apache.kafka.streams.processor.internals.StateDirectory.checkpointFileFor(StateDirectory.java:121) at org.apache.kafka.streams.processor.internals.TaskManager.getTaskOffsetSums(TaskManager.java:498) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscriptionUserData(StreamsPartitionAssignor.java:239) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:560) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:495) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:417) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1265) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:770) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:630) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10010) Should close standby task for safety during HandleLostAll
Boyang Chen created KAFKA-10010: --- Summary: Should close standby task for safety during HandleLostAll Key: KAFKA-10010 URL: https://issues.apache.org/jira/browse/KAFKA-10010 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen The current lost all logic doesn't close standby task, which could potentially lead to a tricky condition like below: 1. The standby task was initializing as `CREATED` state, and task corrupted exception was thrown from registerStateStores 2. The task corrupted exception was caught, and do a non-affected task commit 3. The task commit failed due to task migrated exception 4. The handleLostAll didn't close the standby task, leaving it as CREATED state 5. Next rebalance complete, the same task was assigned back as standby task. 6. Illegal Argument exception caught : {code:java} [2020-05-16T11:56:18-07:00] (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 18:56:18,050] ERROR [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] stream-thread [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) [2020-05-16T11:56:18-07:00] (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) java.lang.IllegalArgumentException: stream-thread [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already been registered. at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) at org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) at org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only
[ https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7566. Resolution: Won't Fix > Add sidecar job to leader (or a random single follower) only > > > Key: KAFKA-7566 > URL: https://issues.apache.org/jira/browse/KAFKA-7566 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Minor > > Hey there, > recently we need to add an archive job to a streaming application. The caveat > is that we need to make sure only one instance is doing this task to avoid > potential race condition, and we also don't want to schedule it as a regular > stream task so that we will be blocking normal streaming operation. > Although we could do so by doing a zk lease, I'm raising the case here since > this could be some potential use case for streaming job also. For example, > there are some `leader specific` operation we could schedule in DSL instead > of adhoc manner. > Let me know if you think this makes sense to you, thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9999) Topic description should be triggered after each failed topic creation iteration
Boyang Chen created KAFKA-: -- Summary: Topic description should be triggered after each failed topic creation iteration Key: KAFKA- URL: https://issues.apache.org/jira/browse/KAFKA- Project: Kafka Issue Type: Bug Components: admin, streams Affects Versions: 2.4.0 Reporter: Boyang Chen We spotted a case in system test failure where the topic already exists but the admin client still attempts to recreate it: {code:java} [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-uwin-cnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-cntByCnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics [SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made ready with 5 retries left (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,221] ERROR stream-thread [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) at org.apache.kafka.clients.con
[jira] [Created] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path
Boyang Chen created KAFKA-9994: -- Summary: Catch TaskMigrated exception in task corruption code path Key: KAFKA-9994 URL: https://issues.apache.org/jira/browse/KAFKA-9994 Project: Kafka Issue Type: Bug Components: streams Reporter: Boyang Chen We have seen a case where the TaskMigrated exception gets thrown from taskManager.commit(). This should be prevented by proper catching. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9993) Think about inheritance in the protocol generation framework
Boyang Chen created KAFKA-9993: -- Summary: Think about inheritance in the protocol generation framework Key: KAFKA-9993 URL: https://issues.apache.org/jira/browse/KAFKA-9993 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen We have seen that there are a lot of common fields inside the request/response templates that could be extracted as a super class for auto generated classes. For example most response contains a top level error code. Currently to build a service receiving multiple RPCs, the code template produces a lot of redundant error code extraction logic which is far from ideal. What we want to discuss is whether to enable the general inheritance mechanism in this framework, what's the trade-off and complexity increase, and if there is any workaround just to make less boiler templates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
[ https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9676. Resolution: Fixed The current unit test coverage is pretty good now, closing the ticket. > Add test coverage for new ActiveTaskCreator and StandbyTaskCreator > -- > > Key: KAFKA-9676 > URL: https://issues.apache.org/jira/browse/KAFKA-9676 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, newbie > > The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit > test coverage. We should add corresponding tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
Boyang Chen created KAFKA-9989: -- Summary: StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task Key: KAFKA-9989 URL: https://issues.apache.org/jira/browse/KAFKA-9989 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and skip the record processing validation when the assignment is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8989) Embedded broker could not be reached in unit test
[ https://issues.apache.org/jira/browse/KAFKA-8989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8989. Resolution: Won't Fix > Embedded broker could not be reached in unit test > - > > Key: KAFKA-8989 > URL: https://issues.apache.org/jira/browse/KAFKA-8989 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9984) Should fail the subscription when pattern is empty
Boyang Chen created KAFKA-9984: -- Summary: Should fail the subscription when pattern is empty Key: KAFKA-9984 URL: https://issues.apache.org/jira/browse/KAFKA-9984 Project: Kafka Issue Type: Bug Components: consumer, streams Reporter: Boyang Chen We have seen a case where the consumer subscribes to an empty string pattern: ``` [Consumer ... ] Subscribed to pattern: '' ``` which doesn't make any sense and usually indicate a configuration error. The `consumer.subscribe(pattern)` call should fail with illegal argument for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9972) Corrupted standby task could be committed
Boyang Chen created KAFKA-9972: -- Summary: Corrupted standby task could be committed Key: KAFKA-9972 URL: https://issues.apache.org/jira/browse/KAFKA-9972 Project: Kafka Issue Type: Bug Components: streams Reporter: Boyang Chen Assignee: Boyang Chen A corrupted standby task could revive and transit to the CREATED state, which will then trigger by `taskManager.commitAll` in next runOnce, causing an illegal state: ``` [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 03:57:22,646] WARN [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] stream-thread [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 03:57:22,646] WARN [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] stream-thread [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] Detected the states of tasks \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. (org.apache.kafka.streams.processor.internals.StreamThread) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 03:57:22,652] INFO [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] [Consumer clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions (org.apache.kafka.clients.consumer.KafkaConsumer) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 03:57:22,652] INFO [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] stream-thread [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] standby-task [1_1] Prepared dirty close (org.apache.kafka.streams.processor.internals.StandbyTask) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 03:57:22,679] INFO [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] stream-thread [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] standby-task [1_1] Closed dirty (org.apache.kafka.streams.processor.internals.StandbyTask) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 03:57:22,751] ERROR [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] stream-thread [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) [2020-05-07T20:57:23-07:00] (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) java.lang.IllegalStateException: Illegal state CREATED while preparing standby task 1_1 for committing at org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725) at o
[jira] [Resolved] (KAFKA-8587) One producer per thread for Kafka Streams EOS
[ https://issues.apache.org/jira/browse/KAFKA-8587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8587. Resolution: Fixed > One producer per thread for Kafka Streams EOS > - > > Key: KAFKA-8587 > URL: https://issues.apache.org/jira/browse/KAFKA-8587 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently exactly-once producer is coupled with individual input partitions. > This is not a well scaled solution for client application such as Kafka > Streams, and the root cause is that EOS producer doesn't understand the topic > partition move throughout consumer group rebalance. By amending this semantic > gap, we could achieve much better EOS scalability. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9910) Implement new transaction timed out error
Boyang Chen created KAFKA-9910: -- Summary: Implement new transaction timed out error Key: KAFKA-9910 URL: https://issues.apache.org/jira/browse/KAFKA-9910 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9911) Implement new producer fenced error
Boyang Chen created KAFKA-9911: -- Summary: Implement new producer fenced error Key: KAFKA-9911 URL: https://issues.apache.org/jira/browse/KAFKA-9911 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8993) Add an EOS performance test suite similar to ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-8993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8993. Resolution: Won't Fix > Add an EOS performance test suite similar to ProducerPerformance > > > Key: KAFKA-8993 > URL: https://issues.apache.org/jira/browse/KAFKA-8993 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9878) Block EndTxn call until the txn markers are committed
Boyang Chen created KAFKA-9878: -- Summary: Block EndTxn call until the txn markers are committed Key: KAFKA-9878 URL: https://issues.apache.org/jira/browse/KAFKA-9878 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Currently the EndTxn call from Producer will immediately return as the control record is written to the txn coordinator log. The ongoing transaction will be going to a pending state to wait for all txn markers to be propagated. In the meantime, producer client will start another new transaction but being rejected constantly until the pending state gets resolved, which is unnecessary round trips and more burden to the broker to handle repetitive requests. To avoid this situation, we should make the Producer client wait for txn marker completion instead. This will incur better performance overall, as no more back-off shall be triggered for a subsequent transaction to begin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8870) Prevent dirty reads of Streams state store from Interactive queries
[ https://issues.apache.org/jira/browse/KAFKA-8870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8870. Resolution: Duplicate > Prevent dirty reads of Streams state store from Interactive queries > --- > > Key: KAFKA-8870 > URL: https://issues.apache.org/jira/browse/KAFKA-8870 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Vinoth Chandar >Assignee: Boyang Chen >Priority: Major > > Today, Interactive Queries (IQ) against Streams state store could see > uncommitted data, even with EOS processing guarantees (these are actually > orthogonal, but clarifying since EOS may give the impression that everything > is dandy). This is causes primarily because state updates in rocksdb are > visible even before the kafka transaction is committed. Thus, if the instance > fails, then the failed over instance will redo the uncommited old transaction > and the following could be possible during recovery,. > Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, > instance A fails and any failure/rebalancing will leave the standy instance B > rewinding offsets and reprocessing, during which time IQ can again see V0 or > V1 or any number of previous values for the same key. > In this issue, we will plan work towards providing consistency for IQ, for a > single row in a single state store. i.e once a query sees V1, it can only see > either V1 or V2. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9764) Deprecate Stream Simple benchmark
[ https://issues.apache.org/jira/browse/KAFKA-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9764. Resolution: Fixed > Deprecate Stream Simple benchmark > - > > Key: KAFKA-9764 > URL: https://issues.apache.org/jira/browse/KAFKA-9764 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Over the years, we are seeing this simple benchmark suite to be less valuable > over time. It is built on Jenkins infra which could not guarantee consistent > result out of each run, and most times could not bring in any insights as > well. In order to avoid wasting developer's time for testing performance > against this poor setup, we will remove the test suite as it is no longer > valuable to the community. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9793) Stream HandleAssignment should guarantee task close
[ https://issues.apache.org/jira/browse/KAFKA-9793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9793. Resolution: Fixed > Stream HandleAssignment should guarantee task close > --- > > Key: KAFKA-9793 > URL: https://issues.apache.org/jira/browse/KAFKA-9793 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > When triggering the `handleAssignment` call, if task preCommit throws, the > doom-to-fail task shall not be closed, thus causing a RocksDB metrics > recorder re-addition, which is fatal: > > > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 > 23:50:42,668] INFO > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] > stream-thread > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle > new assignment with: > New active tasks: [1_0, 0_1, 2_0] > New standby tasks: [] > Existing active tasks: [0_1, 1_0, 2_0, 3_1] > Existing standby tasks: [] > (org.apache.kafka.streams.processor.internals.TaskManager) > > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 > 23:50:42,671] INFO > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] > stream-thread > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task > [3_1] Prepared clean close > (org.apache.kafka.streams.processor.internals.StreamTask) > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 > 23:50:42,671] INFO > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] > stream-thread > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task > [0_1] Prepared task for committing > (org.apache.kafka.streams.processor.internals.StreamTask) > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 > 23:50:42,682] ERROR > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] > stream-thread > [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task > [1_0] Failed to flush state store logData10MinuteFinalCount-store: > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) > org.apache.kafka.streams.errors.TaskMigratedException: Error encountered > sending record to topic windowed-node-counts for task 1_0 due to: > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) > org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an > operation with an old epoch. Either there is a newer producer with the same > transactionalId, or the producer's transaction has been expired by the broker. > [2020-03-31T16:50:43-07:00] > (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets > would not be recorded and no more records would be sent since the producer is > fenced, indicating the task may be migrated out; it means all tasks belonging > to this thread should be migrated. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768) > at > org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.lang.Thread.run(Thread.java:748) > > The correct solution is to wrap the whole code block by try-catch to avoid > unexpected close failure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9860) Transactional Producer could add partitions by batch at the end
Boyang Chen created KAFKA-9860: -- Summary: Transactional Producer could add partitions by batch at the end Key: KAFKA-9860 URL: https://issues.apache.org/jira/browse/KAFKA-9860 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen As of today, the Producer transaction manager bookkeeps the partitions involved with current transaction. Each time it sees a new partition, it will try to send a request to add all the involved partitions to the broker, which results in multiple requests. If we could batch the work by the end of the transaction, we save unnecessary round trips. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9784) Add OffsetFetch to the concurrent coordinator test
[ https://issues.apache.org/jira/browse/KAFKA-9784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9784. Resolution: Fixed > Add OffsetFetch to the concurrent coordinator test > -- > > Key: KAFKA-9784 > URL: https://issues.apache.org/jira/browse/KAFKA-9784 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Normally consumers would first do an OffsetFetch before starting the normal > processing. It makes sense to add it to the concurrent test suite. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9809) Shrink transaction timeout for Streams
[ https://issues.apache.org/jira/browse/KAFKA-9809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9809. Resolution: Fixed > Shrink transaction timeout for Streams > -- > > Key: KAFKA-9809 > URL: https://issues.apache.org/jira/browse/KAFKA-9809 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Per 447 documented, we need to shrink transaction timeout for Streams to 10 > seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9809) Shrink transaction timeout for Streams
Boyang Chen created KAFKA-9809: -- Summary: Shrink transaction timeout for Streams Key: KAFKA-9809 URL: https://issues.apache.org/jira/browse/KAFKA-9809 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Per 447 documented, we need to shrink transaction timeout for Streams to 10 seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9793) Stream HandleAssignment should guarantee task close
Boyang Chen created KAFKA-9793: -- Summary: Stream HandleAssignment should guarantee task close Key: KAFKA-9793 URL: https://issues.apache.org/jira/browse/KAFKA-9793 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen When triggering the `handleAssignment` call, if task preCommit throws, the doom-to-fail task shall not be closed, thus causing a RocksDB metrics recorder re-addition, which is fatal: [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,668] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle new assignment with: New active tasks: [1_0, 0_1, 2_0] New standby tasks: [] Existing active tasks: [0_1, 1_0, 2_0, 3_1] Existing standby tasks: [] (org.apache.kafka.streams.processor.internals.TaskManager) [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,671] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [3_1] Prepared clean close (org.apache.kafka.streams.processor.internals.StreamTask) [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,671] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [0_1] Prepared task for committing (org.apache.kafka.streams.processor.internals.StreamTask) [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,682] ERROR [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [1_0] Failed to flush state store logData10MinuteFinalCount-store: (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic windowed-node-counts for task 1_0 due to: [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768) at org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) The correct solution is to wrap the whole code block by try-catch to avoid unexpected close failure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9784) Add OffsetFetch to the concurrent coordinator test
Boyang Chen created KAFKA-9784: -- Summary: Add OffsetFetch to the concurrent coordinator test Key: KAFKA-9784 URL: https://issues.apache.org/jira/browse/KAFKA-9784 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen Normally consumers would first do an OffsetFetch before starting the normal processing. It makes sense to add it to the concurrent test suite. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9779) Add version 2.5 to streams system tests
Boyang Chen created KAFKA-9779: -- Summary: Add version 2.5 to streams system tests Key: KAFKA-9779 URL: https://issues.apache.org/jira/browse/KAFKA-9779 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9764) Deprecate Stream Simple benchmark
Boyang Chen created KAFKA-9764: -- Summary: Deprecate Stream Simple benchmark Key: KAFKA-9764 URL: https://issues.apache.org/jira/browse/KAFKA-9764 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen Over the years, we are seeing this simple benchmark suite to be less valuable over time. It is built on Jenkins infra which could not guarantee consistent result out of each run, and most times could not bring in any insights as well. In order to avoid wasting developer's time for testing performance against this poor setup, we will remove the test suite as it is no longer valuable to the community. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9760) Add EOS protocol changes to documentation
Boyang Chen created KAFKA-9760: -- Summary: Add EOS protocol changes to documentation Key: KAFKA-9760 URL: https://issues.apache.org/jira/browse/KAFKA-9760 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9759) Add documentation change for KIP-562
Boyang Chen created KAFKA-9759: -- Summary: Add documentation change for KIP-562 Key: KAFKA-9759 URL: https://issues.apache.org/jira/browse/KAFKA-9759 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9758) Add documentations for Streams release
Boyang Chen created KAFKA-9758: -- Summary: Add documentations for Streams release Key: KAFKA-9758 URL: https://issues.apache.org/jira/browse/KAFKA-9758 Project: Kafka Issue Type: Improvement Affects Versions: 2.5.0 Reporter: Boyang Chen This ticket tracks the doc updates for a couple of KIPs launched in stream 2.5, including: KIP-523 KIP-527 KIP-530 KIP-562 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9757) Add documentation change for KIP-535
Boyang Chen created KAFKA-9757: -- Summary: Add documentation change for KIP-535 Key: KAFKA-9757 URL: https://issues.apache.org/jira/browse/KAFKA-9757 Project: Kafka Issue Type: Sub-task Affects Versions: 2.5.0 Reporter: Boyang Chen Assignee: Vinoth Chandar Just a reminder to add documentations for KIP-535 in both the release notes and streams documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue
[ https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9475. Resolution: Won't Fix It seems not worth the effort as we could just shrink the scheduler time > Replace transaction abortion scheduler with a delayed queue > --- > > Key: KAFKA-9475 > URL: https://issues.apache.org/jira/browse/KAFKA-9475 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Priority: Major > > Although we could try setting the txn timeout to be 10 second, the purging > scheduler only works every one minute interval, so in the worst case we shall > still wait for 1 minute. We are considering several potential fixes: > # Change interval to 10 seconds: means we will have 6X frequent checking, > more read contention on txn metadata. The benefit here is an easy one-line > fix without correctness concern > # Use an existing delayed queue, a.k.a purgatory. From what I heard, the > purgatory needs at least 2 extra threads to work properly, with some add-on > overhead for memory and complexity. The benefit here is more precise timeout > reaction, without a redundant full metadata read lock. > # Create a new delayed queue. This could be done by using scala delayed > queue, the concern here is that whether this approach is production ready. > Benefits are the same as 2, with less code complexity potentially > This ticket is to track #2 progress if we decide to go through this path > eventually. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9743) StreamTask could fail to close during HandleNewAssignment
[ https://issues.apache.org/jira/browse/KAFKA-9743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9743. Resolution: Fixed > StreamTask could fail to close during HandleNewAssignment > - > > Key: KAFKA-9743 > URL: https://issues.apache.org/jira/browse/KAFKA-9743 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > We found this particular bug from happening in soak: > [2020-03-20T16:12:02-07:00] > (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) [2020-03-20 > 23:12:01,534] ERROR > [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] > stream-thread > [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-03-20T16:12:02-07:00] > (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) > java.lang.IllegalStateException: RocksDB metrics recorder for store > "KSTREAM-AGGREGATE-STATE-STORE-40" of task 2_2 has already been > added. This is a bug in Kafka Streams. > at > org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30) > at > org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:98) > at > org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:207) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:193) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:231) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:191) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:329) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:587) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:501) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:475) > > Which could bring the entire instance down. The bug was that if we fail to do > the commit during task close section, the actual `closeClean` call could not > be triggered. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8230) Add static membership support in librd consumer client
[ https://issues.apache.org/jira/browse/KAFKA-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8230. Resolution: Fixed Will be tracked on the librdKafka side, which seems to be done > Add static membership support in librd consumer client > --- > > Key: KAFKA-8230 > URL: https://issues.apache.org/jira/browse/KAFKA-8230 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Minor > Labels: consumer > > Once the effort in https://issues.apache.org/jira/browse/KAFKA-7018 is done, > one of the low hanging fruit is to add this support for other language Kafka > consumers, such as c consumer in librdKafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9751) Admin `FindCoordinator` call should go to controller instead of ZK
Boyang Chen created KAFKA-9751: -- Summary: Admin `FindCoordinator` call should go to controller instead of ZK Key: KAFKA-9751 URL: https://issues.apache.org/jira/browse/KAFKA-9751 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen In current trunk, we are still going to use ZK for topic creation in the routing of FindCoordinatorRequest: val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata) Which should be migrated to controller handling -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9743) StreamTask could fail to close during HandleNewAssignment
Boyang Chen created KAFKA-9743: -- Summary: StreamTask could fail to close during HandleNewAssignment Key: KAFKA-9743 URL: https://issues.apache.org/jira/browse/KAFKA-9743 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen We found this particular bug from happening in soak: [2020-03-20T16:12:02-07:00] (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) [2020-03-20 23:12:01,534] ERROR [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] stream-thread [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) [2020-03-20T16:12:02-07:00] (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) java.lang.IllegalStateException: RocksDB metrics recorder for store "KSTREAM-AGGREGATE-STATE-STORE-40" of task 2_2 has already been added. This is a bug in Kafka Streams. at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30) at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:98) at org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:207) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:193) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:231) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:191) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:329) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:587) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:501) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:475) Which could bring the entire instance down. The bug was that if we fail to do the commit during task close section, the actual `closeClean` call could not be triggered. -- This message was sent by Atlassian Jira (v8.3.4#803005)