[jira] [Resolved] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling

2021-04-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7728.

Resolution: Won't Fix

This is no longer a critical fix to the static membership, as leader rejoin 
with no member info won't trigger rebalance in the original implementation.

> Add JoinReason to the join group request for better rebalance handling
> --
>
> Key: KAFKA-7728
> URL: https://issues.apache.org/jira/browse/KAFKA-7728
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: consumer, mirror-maker, needs-kip
>
> Recently [~mgharat] and I discussed about the current rebalance logic on 
> leader join group request handling. So far we blindly trigger rebalance when 
> the leader rejoins. The caveat is that KIP-345 is not covering this effort 
> and if a consumer group is not using sticky assignment but using other 
> strategy like round robin, the redundant rebalance could still shuffle the 
> topic partitions around consumers. (for example mirror maker application)
> I checked on broker side and here is what we currently do:
>  
> {code:java}
> if (group.isLeader(memberId) || !member.matches(protocols))  
> // force a rebalance if a member has changed metadata or if the leader sends 
> JoinGroup. 
> // The latter allows the leader to trigger rebalances for changes affecting 
> assignment 
> // which do not affect the member metadata (such as topic metadata changes 
> for the consumer) {code}
> Based on the broker logic, we only need to trigger rebalance for leader 
> rejoin when the topic metadata change has happened. I also looked up the 
> ConsumerCoordinator code on client side, and found out the metadata 
> monitoring logic here:
> {code:java}
> public boolean rejoinNeededOrPending() {
> ...
> // we need to rejoin if we performed the assignment and metadata has changed
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.equals(metadataSnapshot))
>   return true;
> }{code}
>  I guess instead of just returning true, we could introduce a new enum field 
> called JoinReason which could indicate the purpose of the rejoin. Thus we 
> don't need to do a full rebalance when the leader is just in rolling bounce.
> We could utilize this information I guess. Just add another enum field into 
> the join group request called JoinReason so that we know whether leader is 
> rejoining due to topic metadata change. If yes, we trigger rebalance 
> obviously; if no, we shouldn't trigger rebalance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12499) Adjust transaction timeout according to commit interval on Streams EOS

2021-03-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12499:
---

 Summary: Adjust transaction timeout according to commit interval 
on Streams EOS
 Key: KAFKA-12499
 URL: https://issues.apache.org/jira/browse/KAFKA-12499
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen
 Fix For: 3.0.0


The transaction timeout is set to 1 minute by default on Producer today, while 
the commit interval on the other hand could be set to a very large value, which 
makes the stream always hit transaction timeout and drop into rebalance. We 
should increase the transaction timeout correspondingly when commit interval is 
large.

On the other hand, broker could have a limit on the max transaction timeout to 
be set. If we scale up client transaction timeout over the limit, stream will 
fail due to  INVALID_TRANSACTION_TIMEOUT. To alleviate this problem, user could 
define their own customized transaction timeout to avoid hitting the limit, so 
we should still respect what user configures in the override.

The new rule for configuring transaction timeout should look like:
1. If transaction timeout is set in streams config, use it

2. if not, transaction_timeout = max(default_transaction_timeout, 10 * 
commit_interval) 

Additionally if INVALID_TRANSACTION_TIMEOUT was thrown on Streams when calling 
initTransaction(), we should wrap the exception to inform user that their 
setting for commit interval could potentially be too high and should adjust.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-12381.
-
Resolution: Fixed

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: kip-500
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12307) Wrap non-fatal exception as CommitFailedException when thrown from commitTxn

2021-02-06 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12307:
---

 Summary: Wrap non-fatal exception as CommitFailedException when 
thrown from commitTxn
 Key: KAFKA-12307
 URL: https://issues.apache.org/jira/browse/KAFKA-12307
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


As stated in the KIP, we need to ensure the non-fatal exception get wrapped 
inside CommitFailed to be properly handled by the new template.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12304) Improve topic validation in auto topic creation

2021-02-05 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12304:
---

 Summary: Improve topic validation in auto topic creation
 Key: KAFKA-12304
 URL: https://issues.apache.org/jira/browse/KAFKA-12304
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


The topic validation path should have a higher priority than other follow-up 
processes. Basically we should move it right after the topic authorization is 
done in metadata request handling.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation

2021-02-04 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12294:
---

 Summary: Consider using the forwarding mechanism for metadata auto 
topic creation
 Key: KAFKA-12294
 URL: https://issues.apache.org/jira/browse/KAFKA-12294
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to 
improve the topic creation auditing by forwarding the CreateTopicsRequest 
inside Envelope for the given client. Details in 
[here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12260) PartitionsFor should not return null value

2021-01-30 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12260:
---

 Summary: PartitionsFor should not return null value
 Key: KAFKA-12260
 URL: https://issues.apache.org/jira/browse/KAFKA-12260
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Boyang Chen


consumer.partitionsFor() could return null value when topic was not found. This 
was not properly documented and was error-prone when the return type was a 
list. We should fix the logic internally to prevent partitionsFor returning 
null result.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12215) Broker could cache its overlapping ApiVersions with active controller

2021-01-15 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12215:
---

 Summary: Broker could cache its overlapping ApiVersions with 
active controller
 Key: KAFKA-12215
 URL: https://issues.apache.org/jira/browse/KAFKA-12215
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


Right now, each ApiVersionRequest would need to compute overlapping api 
versions with controller every time: 
https://issues.apache.org/jira/browse/KAFKA-10674

Unless the active controller is changed, the computation result should always 
be the same. We may consider cache the result and only update it when the 
controller change happens.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10868) Avoid double wrapping KafkaException

2020-12-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10868:
---

 Summary: Avoid double wrapping KafkaException
 Key: KAFKA-10868
 URL: https://issues.apache.org/jira/browse/KAFKA-10868
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9552) Stream should handle OutOfSequence exception thrown from Producer

2020-12-10 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9552.

Resolution: Not A Problem

> Stream should handle OutOfSequence exception thrown from Producer
> -
>
> Key: KAFKA-9552
> URL: https://issues.apache.org/jira/browse/KAFKA-9552
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Priority: Major
>
> As of today the stream thread could die from OutOfSequence error:
> {code:java}
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
> 15:14:35,185] ERROR 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
> stream-thread 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
> to commit stream task 3_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending 
> since an error caught with a previous record (timestamp 1581484094825) to 
> topic stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due 
> to org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353)
> {code}
>  Although this is fatal exception for Producer, stream should treat it as an 
> opportunity to reinitialize by doing a rebalance, instead of killing 
> computation resource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10813) StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in all cases

2020-12-10 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10813.
-
Resolution: Fixed

> StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in 
> all cases
> -
>
> Key: KAFKA-10813
> URL: https://issues.apache.org/jira/browse/KAFKA-10813
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.7.0
>
>
> We fixed the error code handling on producer in 
> https://issues.apache.org/jira/browse/KAFKA-10687, however the newly thrown 
> `InvalidProducerEpoch` exception was not properly handled on Streams side in 
> all cases. We should catch it and rethrow as TaskMigrated to trigger 
> exception, similar to ProducerFenced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10813) StreamsProducer should catch InvalidProducerEpoch and throw TaskMigrated in all cases

2020-12-04 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10813:
---

 Summary: StreamsProducer should catch InvalidProducerEpoch and 
throw TaskMigrated in all cases
 Key: KAFKA-10813
 URL: https://issues.apache.org/jira/browse/KAFKA-10813
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
 Fix For: 2.7.0


We fixed the error code handling on producer in 
https://issues.apache.org/jira/browse/KAFKA-10687, however the newly thrown 
`InvalidProducerEpoch` exception was not properly handled on Streams side in 
all cases. We should catch it and rethrow as TaskMigrated to trigger exception, 
similar to ProducerFenced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10807) AlterConfig should be validated by the target broker

2020-12-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10807:
---

 Summary: AlterConfig should be validated by the target broker
 Key: KAFKA-10807
 URL: https://issues.apache.org/jira/browse/KAFKA-10807
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


After forwarding is enabled, AlterConfigs will no longer be sent to the target 
broker. This behavior bypasses some important config change validations, such 
as path existence, static config conflict, or even worse when the target broker 
is offline, the propagated result does not reflect a true applied result. We 
should gather those necessary cases, and decide whether to actually handle the 
AlterConfig request firstly on the target broker, and then forward, in a 
validate-forward-apply path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10733:
---

 Summary: Enforce exception thrown for KafkaProducer txn APIs
 Key: KAFKA-10733
 URL: https://issues.apache.org/jira/browse/KAFKA-10733
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


In general, KafkaProducer could throw both fatal and non-fatal errors as 
KafkaException, which makes the exception catching hard. Furthermore, not every 
single fatal exception (checked) is marked on the function signature yet as of 
2.7.

We should have a general supporting strategy in long term for this matter, as 
whether to declare all non-fatal exceptions as wrapped KafkaException while 
extracting all fatal ones, or just add a flag to KafkaException indicating 
fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10714) Save unnecessary end txn call when the transaction is confirmed to be done

2020-11-12 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10714:
---

 Summary: Save unnecessary end txn call when the transaction is 
confirmed to be done
 Key: KAFKA-10714
 URL: https://issues.apache.org/jira/browse/KAFKA-10714
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In certain error cases after KIP-588, we may skip the call of `EndTxn` to the 
txn coordinator such as for the transaction timeout case, where we know the 
transaction is already terminated on the broker side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10699) Add system test coverage for group coordinator emigration

2020-11-08 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10699:
---

 Summary: Add system test coverage for group coordinator emigration
 Key: KAFKA-10699
 URL: https://issues.apache.org/jira/browse/KAFKA-10699
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


After merging the fix https://issues.apache.org/jira/browse/KAFKA-10284, we 
believe that it is important to add system test coverage for the group 
coordinator migration to verify consumer behaviors are correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED

2020-11-05 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10687:
---

 Summary: Produce request should be bumped for new error code 
PRODUCE_FENCED
 Key: KAFKA-10687
 URL: https://issues.apache.org/jira/browse/KAFKA-10687
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen
 Fix For: 2.7.0


In https://issues.apache.org/jira/browse/KAFKA-9910, we missed a case where the 
ProduceRequest needs to be bumped to return the new error code PRODUCE_FENCED. 
This gap needs to be addressed as a blocker since it is shipping in 2.7.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10181) Create Envelope RPC and redirection template for configuration change RPCs

2020-11-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10181.
-
Resolution: Fixed

> Create Envelope RPC and redirection template for configuration change RPCs
> --
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.8.0
>
>
> In the bridge release broker, 
> AlterConfig/IncrementalAlterConfig/CreateTopics/AlterClientQuota should be 
> redirected to the active controller. This ticket will ensure those RPCs get 
> redirected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10343) Add IBP based ApiVersion constraint tests

2020-11-02 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10343.
-
Resolution: Won't Fix

We are planning to work on a more long-term fix for this issue, see 

> Add IBP based ApiVersion constraint tests
> -
>
> Key: KAFKA-10343
> URL: https://issues.apache.org/jira/browse/KAFKA-10343
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.8.0
>
>
> We need to add ApiVersion constraints test based on IBP to remind future 
> developer bump it when new RPC version is developed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10674) Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding

2020-11-02 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10674:
---

 Summary: Brokers should know the active controller ApiVersion 
after enabling KIP-590 forwarding
 Key: KAFKA-10674
 URL: https://issues.apache.org/jira/browse/KAFKA-10674
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Reporter: Boyang Chen


Admin clients send ApiVersions to the broker upon the first connection 
establishes. The tricky thing after forwarding is enabled is that for 
forwardable APIs, admin client needs to know a commonly-agreed rang of 
ApiVersions among handling broker, active controller and itself.

Right now the inter-broker APIs are guaranteed by IBP constraints, but not for 
forwardable APIs. A compromised solution would be to put all forwardable APIs 
under IBP, which is brittle and hard to maintain consistency.

Instead, any broker connecting to the active controller should send an 
ApiVersion request from beginning, so it is easy to compute that information 
and send back to the admin clients upon ApiVersion request from admin.  Any 
rolling of the active controller will trigger reconnection between broker and 
controller, which guarantees a refreshed ApiVersions between the two. This 
approach avoids the tight bond with IBP and broker could just close the 
connection between admin client to trigger retry logic and refreshing of the 
ApiVersions. Since this failure should be rare, two round-trips and timeout 
delays are well compensated by the less engineering work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10668) Avoid deserialization on second hop for request forwarding

2020-10-30 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10668:
---

 Summary: Avoid deserialization on second hop for request forwarding
 Key: KAFKA-10668
 URL: https://issues.apache.org/jira/browse/KAFKA-10668
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


Right now on forwarding broker we would deserialize the response and serialize 
it again to respond to the client. It should be able to keep the response data 
sealed and send back to the client to save some CPU cost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10667) Add timeout for forwarding requests

2020-10-30 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10667:
---

 Summary: Add timeout for forwarding requests
 Key: KAFKA-10667
 URL: https://issues.apache.org/jira/browse/KAFKA-10667
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


It makes sense to handle timeout for forwarding request coming from the client, 
instead of retry indefinitely. We could either use the api timeout, or a 
customized timeout hook which could be defined by different request types.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10663) Flakey test ConsumerBounceTest#testSeekAndCommitWithBrokerFailures

2020-10-29 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10663:
---

 Summary: Flakey test 
ConsumerBounceTest#testSeekAndCommitWithBrokerFailures
 Key: KAFKA-10663
 URL: https://issues.apache.org/jira/browse/KAFKA-10663
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.7.0
Reporter: Boyang Chen


org.apache.kafka.common.KafkaException: Socket server failed to bind to 
localhost:40823: Address already in use. at 
kafka.network.Acceptor.openServerSocket(SocketServer.scala:671) at 
kafka.network.Acceptor.(SocketServer.scala:539) at 
kafka.network.SocketServer.createAcceptor(SocketServer.scala:280) at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:253)
 at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
 at kafka.network.SocketServer.startup(SocketServer.scala:125) at 
kafka.server.KafkaServer.startup(KafkaServer.scala:303) at 
kafka.utils.TestUtils$.createServer(TestUtils.scala:160) at 
kafka.utils.TestUtils$.createServer(TestUtils.scala:151) at 
kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:102)
 at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scal

 

>From AK 2.7



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema

2020-10-28 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10657:
---

 Summary: Incorporate Envelope into auto-generated JSON schema
 Key: KAFKA-10657
 URL: https://issues.apache.org/jira/browse/KAFKA-10657
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


We need to add support to output JSON format for embed request inside Envelope 
to do better request loggin.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9999) Internal topic creation failure should be non-fatal and trigger explicit rebalance

2020-10-22 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-.

Resolution: Won't Fix

> Internal topic creation failure should be non-fatal and trigger explicit 
> rebalance 
> ---
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, streams
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We spotted a case in system test failure where the topic already exists but 
> the admin client still attempts to recreate it:
>  
> {code:java}
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably 
> marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number 
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-uwin-cnt-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager) 
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number 
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-cntByCnt-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
> [SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, 
> SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
> ready with 5 retries left 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
> after 5 retries. This can happen if the Kafka cluster is temporary not 
> available. You can increase admin client config `retries` to be resilient 
> against this error. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,221] ERROR stream-thread 
> [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered 
> the following unexpected Kafka exception during processing, this usually 
> indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: Could not create topics 
> after 5 retries. This can happen if the Kafka cluster is temporary not 
> available. You can increase admin client config `retries` to be resilient 
> against this error.
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at 
> 

[jira] [Created] (KAFKA-10607) Ensure the error counts contains the NONE

2020-10-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10607:
---

 Summary: Ensure the error counts contains the NONE
 Key: KAFKA-10607
 URL: https://issues.apache.org/jira/browse/KAFKA-10607
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Boyang Chen


In the RPC errorCounts() call, there are inconsistent behaviors from the 
default implementation, for example certain RPCs filter out Errors.NONE during 
the map generation. We should make it consistent by applying the errorCounts() 
to Errors.NONE for all RPCs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10594) Enhance Raft exception handling

2020-10-09 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10594:
---

 Summary: Enhance Raft exception handling
 Key: KAFKA-10594
 URL: https://issues.apache.org/jira/browse/KAFKA-10594
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


The current exception handling on the Raft implementation is superficial, for 
example we don't treat file system exception and request handling exception 
differently. It's necessary to decide what kind of exception should be fatal, 
what kind of exception should be responding to the client, and what exception 
could be retried.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10508) Consider moving ForwardRequestHandler to a separate class

2020-09-21 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10508:
---

 Summary: Consider moving ForwardRequestHandler to a separate class
 Key: KAFKA-10508
 URL: https://issues.apache.org/jira/browse/KAFKA-10508
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


With the new redirection template merged in 
[https://github.com/apache/kafka/pull/9103,] the size of KafkaApis file grows 
to 3500+, which is reaching a fair large size. We should consider moving the 
redirection template out as a separate file to reduce the main class size.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10400) Add a customized Kafka Streams logo

2020-08-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10400:
---

 Summary: Add a customized Kafka Streams logo
 Key: KAFKA-10400
 URL: https://issues.apache.org/jira/browse/KAFKA-10400
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10042.
-
Resolution: Fixed

> Make INVALID_PRODUCER_EPOCH abortable from Produce response
> ---
>
> Key: KAFKA-10042
> URL: https://issues.apache.org/jira/browse/KAFKA-10042
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9911) Implement new producer fenced error

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9911.

Resolution: Fixed

> Implement new producer fenced error
> ---
>
> Key: KAFKA-9911
> URL: https://issues.apache.org/jira/browse/KAFKA-9911
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10345) Redirect AlterPartitionReassignment to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10345:
---

 Summary: Redirect AlterPartitionReassignment to the controller 
 Key: KAFKA-10345
 URL: https://issues.apache.org/jira/browse/KAFKA-10345
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the AlterPartitionReassignment should be 
redirected to the active controller instead of relying on admin client 
discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10344) Redirect Create/Renew/ExpireDelegationToken to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10344:
---

 Summary: Redirect Create/Renew/ExpireDelegationToken to the 
controller
 Key: KAFKA-10344
 URL: https://issues.apache.org/jira/browse/KAFKA-10344
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, Create/Renew/ExpireDelegationToken should be 
redirected to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10343) Redirect AlterClientQuotas to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10343:
---

 Summary: Redirect AlterClientQuotas to the controller
 Key: KAFKA-10343
 URL: https://issues.apache.org/jira/browse/KAFKA-10343
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the new Admin client, the AlterClientQuotas request should be redirected to 
the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10342) Redirect CreateAcls/DeleteAcls to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10342:
---

 Summary: Redirect CreateAcls/DeleteAcls to the controller
 Key: KAFKA-10342
 URL: https://issues.apache.org/jira/browse/KAFKA-10342
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the new Admin client, the CreateAcls/DeleteAcls request should be redirected 
to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10350) Add redirect request monitoring metrics

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10350:
---

 Summary: Add redirect request monitoring metrics
 Key: KAFKA-10350
 URL: https://issues.apache.org/jira/browse/KAFKA-10350
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


We need to add the metric for monitoring redirection progress as stated in the 
KIP:

MBean:kafka.server:type=RequestMetrics,name=NumRequestsForwardingToControllerPerSec,clientId=([-.\w]+)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10349) Deprecate client side controller access

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10349:
---

 Summary: Deprecate client side controller access
 Key: KAFKA-10349
 URL: https://issues.apache.org/jira/browse/KAFKA-10349
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


As stated in KIP-590, we would disallow new admin client to discover the 
controller location for encapsulation. For older broker communication, the 
metadata response will still contain the controller location.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10348) Redirect UpdateFeatures to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10348:
---

 Summary: Redirect UpdateFeatures to the controller
 Key: KAFKA-10348
 URL: https://issues.apache.org/jira/browse/KAFKA-10348
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the UpdateFeatures should be redirected to the 
active controller instead of relying on admin client discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10347) Redirect Create/DeleteTopics to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10347:
---

 Summary: Redirect Create/DeleteTopics to the controller
 Key: KAFKA-10347
 URL: https://issues.apache.org/jira/browse/KAFKA-10347
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the Create/DeleteTopics should be redirected to 
the active controller instead of relying on admin client discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10346) Redirect CreatePartition to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10346:
---

 Summary: Redirect CreatePartition to the controller
 Key: KAFKA-10346
 URL: https://issues.apache.org/jira/browse/KAFKA-10346
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the CreatePartition should be redirected to the 
active controller instead of relying on admin client discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig

2020-07-29 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10270.
-
Resolution: Fixed

> Add a broker to controller channel manager to redirect AlterConfig
> --
>
> Key: KAFKA-10270
> URL: https://issues.apache.org/jira/browse/KAFKA-10270
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Per KIP-590 requirement, we need to have a dedicate communication channel 
> from broker to the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10321) shouldDieOnInvalidOffsetExceptionWhileRunning would block forever on JDK11

2020-07-29 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10321:
---

 Summary: shouldDieOnInvalidOffsetExceptionWhileRunning would block 
forever on JDK11
 Key: KAFKA-10321
 URL: https://issues.apache.org/jira/browse/KAFKA-10321
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Boyang Chen


Have spotted two definite cases where the test  
shouldDieOnInvalidOffsetExceptionWhileRunning fails to stop during the whole 
test suite:
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7604/consoleFull]

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10311) Flaky test KafkaAdminClientTest#testMetadataRetries

2020-07-26 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10311:
---

 Summary: Flaky test KafkaAdminClientTest#testMetadataRetries
 Key: KAFKA-10311
 URL: https://issues.apache.org/jira/browse/KAFKA-10311
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3545/testReport/junit/org.apache.kafka.clients.admin/KafkaAdminClientTest/testMetadataRetries/]

 
h3. Error Message

java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, 
deadlineMs=1595694629113, tries=1, nextAllowedTryMs=1595694629217) timed out at 
1595694629117 after 1 attempt(s)
h3. Stacktrace

java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, 
deadlineMs=1595694629113, tries=1, nextAllowedTryMs=1595694629217) timed out at 
1595694629117 after 1 attempt(s) at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries(KafkaAdminClientTest.java:995)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, 
deadlineMs=1595694629113, tries=1, nextAllowedTryMs=1595694629217) timed out at 
1595694629117 after 1 attempt(s) Caused by: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-07-25 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10307:
---

 Summary: Topology cycles in 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
 Key: KAFKA-10307
 URL: https://issues.apache.org/jira/browse/KAFKA-10307
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


We have spotted a cycled topology for the foreign-key join test 
*shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug in 
the algorithm or the test only. Used 
[https://zz85.github.io/kafka-streams-viz/] to visualize:


{code:java}
Sub-topology: 0
Source: KTABLE-SOURCE-19 (topics: 
[KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
Source: KTABLE-SOURCE-32 (topics: 
[KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
Source: KSTREAM-SOURCE-01 (topics: [table1])
  --> KTABLE-SOURCE-02
Processor: 
KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
[table1-STATE-STORE-00])
  --> KTABLE-FK-JOIN-OUTPUT-21
  <-- KTABLE-SOURCE-19
Processor: 
KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
[INNER-store1])
  --> KTABLE-FK-JOIN-OUTPUT-34
  <-- KTABLE-SOURCE-32
Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
  <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
  --> KTABLE-TOSTREAM-35
  <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
Processor: KTABLE-SOURCE-02 (stores: 
[table1-STATE-STORE-00])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
  <-- KSTREAM-SOURCE-01
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: [])
  --> KTABLE-SINK-11
  <-- KTABLE-SOURCE-02
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: [])
  --> KTABLE-SINK-24
  <-- KTABLE-FK-JOIN-OUTPUT-21
Processor: KTABLE-TOSTREAM-35 (stores: [])
  --> KSTREAM-SINK-36
  <-- KTABLE-FK-JOIN-OUTPUT-34
Sink: KSTREAM-SINK-36 (topic: output-)
  <-- KTABLE-TOSTREAM-35
Sink: KTABLE-SINK-11 (topic: 
KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
  <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
Sink: KTABLE-SINK-24 (topic: 
KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
  <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
Source: KSTREAM-SOURCE-04 (topics: [table2])
  --> KTABLE-SOURCE-05
Source: KTABLE-SOURCE-12 (topics: 
[KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
[KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
  <-- KTABLE-SOURCE-12
Processor: KTABLE-SOURCE-05 (stores: 
[table2-STATE-STORE-03])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
  <-- KSTREAM-SOURCE-04
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
[table2-STATE-STORE-03])
  --> KTABLE-SINK-18
  <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
[KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
  --> KTABLE-SINK-18
  <-- KTABLE-SOURCE-05
Sink: KTABLE-SINK-18 (topic: 
KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
  <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
Source: KSTREAM-SOURCE-07 (topics: [table3])
  --> KTABLE-SOURCE-08
Source: KTABLE-SOURCE-25 (topics: 
[KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27 (stores: 
[KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-26])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-28
  <-- KTABLE-SOURCE-25
Processor: KTABLE-SOURCE-08 (stores: 
[table3-STATE-STORE-06])
  --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-29
  <-- KSTREAM-SOURCE-07
Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-28 (stores: 

[jira] [Created] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-16 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10284:
---

 Summary: Group membership update due to static member rejoin 
should be persisted
 Key: KAFKA-10284
 URL: https://issues.apache.org/jira/browse/KAFKA-10284
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.6.0
Reporter: Boyang Chen
Assignee: Boyang Chen
 Fix For: 2.6.1


For known static members rejoin, we would update its corresponding member.id 
without triggering a new rebalance. This serves the purpose for avoiding 
unnecessary rebalance for static membership, as well as fencing purpose if some 
still uses the old member.id. 

The bug is that we don't actually persist the membership update, so if no 
upcoming rebalance gets triggered, this new member.id information will get lost 
during group coordinator immigration, thus bringing up the zombie member 
identity.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig

2020-07-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10270:
---

 Summary: Add a broker to controller channel manager to redirect 
AlterConfig
 Key: KAFKA-10270
 URL: https://issues.apache.org/jira/browse/KAFKA-10270
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10239) The groupInstanceId field in DescribeGroup response should be ignorable

2020-07-06 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10239.
-
Resolution: Fixed

> The groupInstanceId field in DescribeGroup response should be ignorable
> ---
>
> Key: KAFKA-10239
> URL: https://issues.apache.org/jira/browse/KAFKA-10239
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Critical
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> We noticed the following error in the logs in the handling of a DescribeGroup 
> request:
> ```
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default groupInstanceId at version 3
> ```
> The problem is that the field is not marked as ignorable. So if the user is 
> relying on static membership and uses an older AdminClient, they will see 
> this error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10241) Pursue a better way to cover ignorable RPC fields

2020-07-06 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10241:
---

 Summary: Pursue a better way to cover ignorable RPC fields 
 Key: KAFKA-10241
 URL: https://issues.apache.org/jira/browse/KAFKA-10241
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Reporter: Boyang Chen


We have hit case such as https://issues.apache.org/jira/browse/KAFKA-10239 
where we accidentally include a non-ignorable field into the returned response, 
and eventually crash older clients who doesn't support this field. It would be 
good to add a generic test suite to cover all existing and new RPC changes to 
ensure that we don't have a chance to put a non-ignorable field for older 
version of clients.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10237) Properly handle in-memory stores OOM

2020-07-06 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10237:
---

 Summary: Properly handle in-memory stores OOM
 Key: KAFKA-10237
 URL: https://issues.apache.org/jira/browse/KAFKA-10237
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen


We have seen the in-memory store buffered too much data and eventually get OOM. 
Generally speaking, OOM has no real indication of the underlying problem and 
increases the difficulty for user debugging, since the failed thread may not be 
the actual culprit which causes the explosion. If we could get better 
protection to avoid hitting memory limit, or at least giving out a clear guide, 
the end user debugging would be much simpler. 

To make it work, we need to enforce a certain memory limit below heap size and 
take actions  when hitting it. The first question would be, whether we set a 
numeric limit, such as 100MB or 500MB, or a percentile limit, such as 60% or 
80% of total memory.

The second question is about the action itself. One approach would be crashing 
the store immediately and inform the user to increase their application 
capacity. The second approach would be opening up an on-disk store 
spontaneously and offload the data to it.

Personally I'm in favor of approach #2 because it has minimum impact to the 
on-going application. However it is more complex and potentially requires 
significant works to define the proper behavior such as the default store 
configuration, how to manage its lifecycle, etc.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10135) Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager

2020-06-24 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10135.
-
Resolution: Fixed

> Extract Task#executeAndMaybeSwallow to be a general utility function into 
> TaskManager
> -
>
> Key: KAFKA-10135
> URL: https://issues.apache.org/jira/browse/KAFKA-10135
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> We have a couple of cases where we need to swallow the exception during 
> operations in both Task class and TaskManager class. This utility method 
> should be generalized at least onto TaskManager level. See discussion comment 
> [here|https://github.com/apache/kafka/pull/8833#discussion_r437697665].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop

2020-06-22 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10192:
---

 Summary: Flaky test BlockingConnectorTest#testBlockInConnectorStop
 Key: KAFKA-10192
 URL: https://issues.apache.org/jira/browse/KAFKA-10192
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/]
h3. Error Message

org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
execute PUT request. Error response: \{"error_code":500,"message":"Request 
timed out"}
h3. Stacktrace

org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
execute PUT request. Error response: \{"error_code":500,"message":"Request 
timed out"} at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346)
 at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300)
 at 
org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185)
 at 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 

[jira] [Created] (KAFKA-10181) AlterConfig/IncrementalAlterConfig should go to controller

2020-06-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10181:
---

 Summary: AlterConfig/IncrementalAlterConfig should go to controller
 Key: KAFKA-10181
 URL: https://issues.apache.org/jira/browse/KAFKA-10181
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


In the new Admin client, the request should always be routed towards the 
controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10087) Properly throw LogTruncation exception from OffsetForLeaderEpoch future

2020-06-17 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10087.
-
Resolution: Fixed

> Properly throw LogTruncation exception from OffsetForLeaderEpoch future
> ---
>
> Key: KAFKA-10087
> URL: https://issues.apache.org/jira/browse/KAFKA-10087
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> For OffsetForLeaderEpoch#onSuccess, we could throw either OffsetOutOfRange or 
> LogTruncation exceptions, which are swallowed by the AsyncClient logic:
>  
> {code:java}
> try { 
> future.complete(handleResponse(node, requestData, resp)); 
> } catch (RuntimeException e) {
>   if (!future.isDone()) { 
>   future.raise(e); 
>}
> }
> {code}
>  We should fix the exception case to throw it to the upstream. In the 
> meantime, we should ensure that any discard exception case gets retried 
> eventually for LeaderOffset call.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10139) Add operational guide for failure recovery

2020-06-10 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10139:
---

 Summary: Add operational guide for failure recovery
 Key: KAFKA-10139
 URL: https://issues.apache.org/jira/browse/KAFKA-10139
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Boyang Chen


In the first released version, we should include an operation manual to the 
feature versioning failure cases, such as:

1. broker crash due to violation of feature versioning

2. ZK data corruption (rare)

We need to ensure this work gets reflected in the AK documentation after the 
implementation and thorough testings are done.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10135) Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager

2020-06-09 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10135:
---

 Summary: Extract Task#executeAndMaybeSwallow to be a general 
utility function into TaskManager
 Key: KAFKA-10135
 URL: https://issues.apache.org/jira/browse/KAFKA-10135
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


We have a couple of cases where we need to swallow the exception during 
operations in both Task class and TaskManager class. This utility method should 
be generalized at least onto TaskManager level. See discussion comment 
[here|https://github.com/apache/kafka/pull/8833#discussion_r437697665].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10130) Rewrite ZkData struct with auto-generated protocol

2020-06-09 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10130:
---

 Summary: Rewrite ZkData struct with auto-generated protocol
 Key: KAFKA-10130
 URL: https://issues.apache.org/jira/browse/KAFKA-10130
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


The ZkData.scala includes a couple of data structs with versions, such as 
BrokerIdZNode. Human effort to evolve a JSON struct is error-prone, compared 
with our RPC automated framework. The benefit of this re-write outweighs the 
trouble IMHO.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10097) Avoid passing task checkpoint file externally

2020-06-06 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10097.
-
Resolution: Fixed

> Avoid passing task checkpoint file externally
> -
>
> Key: KAFKA-10097
> URL: https://issues.apache.org/jira/browse/KAFKA-10097
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In StreamTask, we have the logic to generate a checkpoint offset map to be 
> materialized through StateManager#checkpoint. This map could be either empty 
> map or null, which the former case indicates to only pull down existing state 
> store checkpoint data, while the latter indicates no need to do a checkpoint 
> in the case such as we are suspending a task.
> Having two similar special logics for checkpointing could lead to unexpected 
> bugs, also we should think about separating the empty checkpoint case vs 
> passed-in checkpoint case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9576) Topic creation failure causing Stream thread death

2020-06-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9576.

Resolution: Duplicate

> Topic creation failure causing Stream thread death
> --
>
> Key: KAFKA-9576
> URL: https://issues.apache.org/jira/browse/KAFKA-9576
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Priority: Major
>
> The failure to create an internal topic could lead to the stream thread death 
> due to timeout:
> {code:java}
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 
> 11:03:00,083] ERROR 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> stream-thread [main] Unexpected error during topic description for 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog.
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 
> 11:03:00,083] ERROR 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> stream-thread 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: Could not create topic 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog.
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:209)
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:223)
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:106)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>         at 
> 

[jira] [Created] (KAFKA-10097) Avoid getting null map for task checkpoint

2020-06-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10097:
---

 Summary: Avoid getting null map for task checkpoint
 Key: KAFKA-10097
 URL: https://issues.apache.org/jira/browse/KAFKA-10097
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


In StreamTask, we have the logic to generate a checkpoint offset map to be 
materialized through StateManager#checkpoint. This map could be either empty 
map or null, which the former case indicates to only pull down existing state 
store checkpoint data, while the latter indicates no need to do a checkpoint in 
the case such as we are suspending a task.

Having two similar special logics for checkpointing could lead to unexpected 
bugs, also we should think about separating the empty checkpoint case vs 
passed-in checkpoint case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10087) Properly throw LogTruncation exception from OffsetForLeaderEpoch future

2020-06-02 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10087:
---

 Summary: Properly throw LogTruncation exception from 
OffsetForLeaderEpoch future
 Key: KAFKA-10087
 URL: https://issues.apache.org/jira/browse/KAFKA-10087
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


For OffsetForLeaderEpoch#onSuccess, we could throw either OffsetOutOfRange or 
LogTruncation exceptions, which are swallowed by the AsyncClient logic:
try {
future.complete(handleResponse(node, requestData, resp));
} catch (RuntimeException e) {
if (!future.isDone()) {
future.raise(e);
}
}
We should fix the exception case to throw it to the upstream. In the meantime, 
we should ensure that any discard exception case gets retried eventually for 
LeaderOffset call.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10011) lockedTaskDirectories should be cleared when task gets closed dirty in HandleLostAll

2020-06-01 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10011.
-
Resolution: Fixed

> lockedTaskDirectories should be cleared when task gets closed dirty in 
> HandleLostAll
> 
>
> Key: KAFKA-10011
> URL: https://issues.apache.org/jira/browse/KAFKA-10011
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Tasks who get closed in handleLostAll don't clear out their position inside 
> lockedTaskDirectories, which causes an illegal state afterwards:
> {code:java}
> [2020-05-17T06:21:54-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) [2020-05-17 
> 13:21:54,127] ERROR 
> [stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] 
> stream-thread 
> [stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-17T06:21:54-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) 
> org.apache.kafka.streams.errors.ProcessorStateException: task directory 
> [/mnt/run/streams/state/stream-soak-test/3_1] doesn't exist and couldn't be 
> created
>         at 
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
>         at 
> org.apache.kafka.streams.processor.internals.StateDirectory.checkpointFileFor(StateDirectory.java:121)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.getTaskOffsetSums(TaskManager.java:498)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscriptionUserData(StreamsPartitionAssignor.java:239)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:222)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:560)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:495)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:417)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1265)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:770)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:630)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10010) Should make state store registration idempotent

2020-06-01 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10010.
-
Resolution: Fixed

> Should make state store registration idempotent
> ---
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10064) Add documentation for KIP-571

2020-05-28 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10064:
---

 Summary: Add documentation for KIP-571
 Key: KAFKA-10064
 URL: https://issues.apache.org/jira/browse/KAFKA-10064
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Boyang Chen
Assignee: feyman
 Fix For: 2.6.0


We need to add documentation of KIP-571 similar to what other KIPs going out in 
2.6: [https://github.com/apache/kafka/pull/8621]

 

[~feyman] I'm assigning this to you for now, let me know if there is anything 
missing for context.  Here is the instruction to setup Apache wiki on local: 
[https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response

2020-05-25 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10042:
---

 Summary: Make INVALID_PRODUCER_EPOCH abortable from Produce 
response
 Key: KAFKA-10042
 URL: https://issues.apache.org/jira/browse/KAFKA-10042
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10011) lockedTaskDirectories should be cleared when task gets closed dirty in HandleLostAll

2020-05-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10011:
---

 Summary: lockedTaskDirectories should be cleared when task gets 
closed dirty in HandleLostAll
 Key: KAFKA-10011
 URL: https://issues.apache.org/jira/browse/KAFKA-10011
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


Tasks who get closed in handleLostAll don't clear out their position inside 
lockedTaskDirectories, which causes an illegal state afterwards:
{code:java}
[2020-05-17T06:21:54-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) [2020-05-17 
13:21:54,127] ERROR 
[stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] 
stream-thread 
[stream-soak-test-150cf9ae-793b-4aac-bea0-0fb61d228b39-StreamThread-3] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-17T06:21:54-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0b021dbf00474b6aa_streamslog) 
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/3_1] doesn't exist and couldn't be 
created
        at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
        at 
org.apache.kafka.streams.processor.internals.StateDirectory.checkpointFileFor(StateDirectory.java:121)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.getTaskOffsetSums(TaskManager.java:498)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscriptionUserData(StreamsPartitionAssignor.java:239)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:222)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:560)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:495)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:417)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1265)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:770)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:630)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-17 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10010:
---

 Summary: Should close standby task for safety during HandleLostAll
 Key: KAFKA-10010
 URL: https://issues.apache.org/jira/browse/KAFKA-10010
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


The current lost all logic doesn't close standby task, which could potentially 
lead to a tricky condition like below:



1. The standby task was initializing as `CREATED` state, and task corrupted 
exception was thrown from registerStateStores

2. The task corrupted exception was caught, and do a non-affected task commit

3. The task commit failed due to task migrated exception

4. The handleLostAll didn't close the standby task, leaving it as CREATED state

5. Next rebalance complete, the same task was assigned back as standby task.

6. Illegal Argument exception caught :
{code:java}
[2020-05-16T11:56:18-07:00] 
(streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
18:56:18,050] ERROR 
[stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
stream-thread 
[stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-16T11:56:18-07:00] 
(streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
java.lang.IllegalArgumentException: stream-thread 
[stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
been registered.
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
        at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
        at 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
        at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7566.

Resolution: Won't Fix

> Add sidecar job to leader (or a random single follower) only
> 
>
> Key: KAFKA-7566
> URL: https://issues.apache.org/jira/browse/KAFKA-7566
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>
> Hey there,
> recently we need to add an archive job to a streaming application. The caveat 
> is that we need to make sure only one instance is doing this task to avoid 
> potential race condition, and we also don't want to schedule it as a regular 
> stream task so that we will be blocking normal streaming operation. 
> Although we could do so by doing a zk lease, I'm raising the case here since 
> this could be some potential use case for streaming job also. For example, 
> there are some `leader specific` operation we could schedule in DSL instead 
> of adhoc manner.
> Let me know if you think this makes sense to you, thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9999) Topic description should be triggered after each failed topic creation iteration

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-:
--

 Summary: Topic description should be triggered after each failed 
topic creation iteration 
 Key: KAFKA-
 URL: https://issues.apache.org/jira/browse/KAFKA-
 Project: Kafka
  Issue Type: Bug
  Components: admin, streams
Affects Versions: 2.4.0
Reporter: Boyang Chen


We spotted a case in system test failure where the topic already exists but the 
admin client still attempts to recreate it:

 
{code:java}
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably 
marked for deletion (number of partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-uwin-cnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager) 
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-cntByCnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
[SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, 
SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
ready with 5 retries left 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
after 5 retries. This can happen if the Kafka cluster is temporary not 
available. You can increase admin client config `retries` to be resilient 
against this error. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,221] ERROR stream-thread 
[SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the 
following unexpected Kafka exception during processing, this usually indicate 
Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: Could not create topics after 
5 retries. This can happen if the Kafka cluster is temporary not available. You 
can increase admin client config `retries` to be resilient against this error.
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
 
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
 
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
        at 

[jira] [Created] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9994:
--

 Summary: Catch TaskMigrated exception in task corruption code path 
 Key: KAFKA-9994
 URL: https://issues.apache.org/jira/browse/KAFKA-9994
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Boyang Chen


We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9993) Think about inheritance in the protocol generation framework

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9993:
--

 Summary: Think about inheritance in the protocol generation 
framework
 Key: KAFKA-9993
 URL: https://issues.apache.org/jira/browse/KAFKA-9993
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


We have seen that there are a lot of common fields inside the request/response 
templates that could be extracted as a super class for auto generated classes. 
For example most response contains a top level error code. Currently to build a 
service receiving multiple RPCs, the code template produces a lot of redundant 
error code extraction logic which is far from ideal. 

What we want to discuss is whether to enable the general inheritance mechanism 
in this framework, what's the trade-off and complexity increase, and if there 
is any workaround just to make less boiler templates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9676.

Resolution: Fixed

The current unit test coverage is pretty good now, closing the ticket.

> Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
> --
>
> Key: KAFKA-9676
> URL: https://issues.apache.org/jira/browse/KAFKA-9676
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, newbie
>
> The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit 
> test coverage. We should add corresponding tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9989:
--

 Summary: StreamsUpgradeTest.test_metadata_upgrade could not 
guarantee all processor gets assigned task
 Key: KAFKA-9989
 URL: https://issues.apache.org/jira/browse/KAFKA-9989
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and skip the record processing validation when the assignment is 
empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8989) Embedded broker could not be reached in unit test

2020-05-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8989.

Resolution: Won't Fix

> Embedded broker could not be reached in unit test
> -
>
> Key: KAFKA-8989
> URL: https://issues.apache.org/jira/browse/KAFKA-8989
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-12 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9984:
--

 Summary: Should fail the subscription when pattern is empty
 Key: KAFKA-9984
 URL: https://issues.apache.org/jira/browse/KAFKA-9984
 Project: Kafka
  Issue Type: Bug
  Components: consumer, streams
Reporter: Boyang Chen


We have seen a case where the consumer subscribes to an empty string pattern:
```

[Consumer ...  ] Subscribed to pattern:  ''

```

which doesn't make any sense and usually indicate a configuration error. The 
`consumer.subscribe(pattern)` call should fail with illegal argument for this 
case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9972) Corrupted standby task could be committed

2020-05-07 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9972:
--

 Summary: Corrupted standby task could be committed
 Key: KAFKA-9972
 URL: https://issues.apache.org/jira/browse/KAFKA-9972
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen


A corrupted standby task could revive and transit to the CREATED state, which 
will then trigger by `taskManager.commitAll` in next runOnce, causing an 
illegal state:

```

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,646] WARN 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException 
fetching records from restore consumer for partitions 
[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1], it is 
likely that the consumer's position has fallen out of the topic partition 
offset range because the topic was truncated or compacted on the broker, 
marking the corresponding tasks as corrupted and re-initializing it later. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,646] WARN 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] Detected 
the states of tasks 
\{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1]} 
are corrupted. Will close the task as dirty and re-create and bootstrap from 
scratch. (org.apache.kafka.streams.processor.internals.StreamThread)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
\{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog-1]} 
are corrupted and hence needs to be re-initialized

        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,652] INFO 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
[Consumer 
clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions 
(org.apache.kafka.clients.consumer.KafkaConsumer)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,652] INFO 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
standby-task [1_1] Prepared dirty close 
(org.apache.kafka.streams.processor.internals.StandbyTask)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,679] INFO 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
standby-task [1_1] Closed dirty 
(org.apache.kafka.streams.processor.internals.StandbyTask)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
03:57:22,751] ERROR 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
stream-thread 
[stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)

[2020-05-07T20:57:23-07:00] 
(streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
java.lang.IllegalStateException: Illegal state CREATED while preparing standby 
task 1_1 for committing

        at 
org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134)

        at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752)

        at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725)

        at 

[jira] [Resolved] (KAFKA-8587) One producer per thread for Kafka Streams EOS

2020-05-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8587.

Resolution: Fixed

> One producer per thread for Kafka Streams EOS
> -
>
> Key: KAFKA-8587
> URL: https://issues.apache.org/jira/browse/KAFKA-8587
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently exactly-once producer is coupled with individual input partitions. 
> This is not a well scaled solution for client application such as Kafka 
> Streams, and the root cause is that EOS producer doesn't understand the topic 
> partition move throughout consumer group rebalance. By amending this semantic 
> gap, we could achieve much better EOS scalability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9910) Implement new transaction timed out error

2020-04-23 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9910:
--

 Summary: Implement new transaction timed out error
 Key: KAFKA-9910
 URL: https://issues.apache.org/jira/browse/KAFKA-9910
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9911) Implement new producer fenced error

2020-04-23 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9911:
--

 Summary: Implement new producer fenced error
 Key: KAFKA-9911
 URL: https://issues.apache.org/jira/browse/KAFKA-9911
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8993) Add an EOS performance test suite similar to ProducerPerformance

2020-04-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8993.

Resolution: Won't Fix

> Add an EOS performance test suite similar to ProducerPerformance
> 
>
> Key: KAFKA-8993
> URL: https://issues.apache.org/jira/browse/KAFKA-8993
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9878) Block EndTxn call until the txn markers are committed

2020-04-16 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9878:
--

 Summary: Block EndTxn call until the txn markers are committed
 Key: KAFKA-9878
 URL: https://issues.apache.org/jira/browse/KAFKA-9878
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


Currently the EndTxn call from Producer will immediately return as the control 
record is written to the txn coordinator log. The ongoing transaction will be 
going to a pending state to wait for all txn markers to be propagated. In the 
meantime, producer client will start another new transaction but being rejected 
constantly until the pending state gets resolved, which is unnecessary round 
trips and more burden to the broker to handle repetitive requests.

To avoid this situation, we should make the Producer client wait for txn marker 
completion instead. This will incur better performance overall, as no more 
back-off shall be triggered for a subsequent transaction to begin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8870) Prevent dirty reads of Streams state store from Interactive queries

2020-04-15 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8870.

Resolution: Duplicate

> Prevent dirty reads of Streams state store from Interactive queries
> ---
>
> Key: KAFKA-8870
> URL: https://issues.apache.org/jira/browse/KAFKA-8870
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Boyang Chen
>Priority: Major
>
> Today, Interactive Queries (IQ) against Streams state store could see 
> uncommitted data, even with EOS processing guarantees (these are actually 
> orthogonal, but clarifying since EOS may give the impression that everything 
> is dandy). This is causes primarily because state updates in rocksdb are 
> visible even before the kafka transaction is committed. Thus, if the instance 
> fails, then the failed over instance will redo the uncommited old transaction 
> and the following could be possible during recovery,.
> Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, 
> instance A fails and any failure/rebalancing will leave the standy instance B 
> rewinding offsets and reprocessing, during which time IQ can again see V0 or 
> V1 or any number of previous values for the same key.
> In this issue, we will plan work towards providing consistency for IQ, for a 
> single row in a single state store. i.e once a query sees V1, it can only see 
> either V1 or V2.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9764) Deprecate Stream Simple benchmark

2020-04-15 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9764.

Resolution: Fixed

> Deprecate Stream Simple benchmark
> -
>
> Key: KAFKA-9764
> URL: https://issues.apache.org/jira/browse/KAFKA-9764
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Over the years, we are seeing this simple benchmark suite to be less valuable 
> over time. It is built on Jenkins infra which could not guarantee consistent 
> result out of each run, and most times could not bring in any insights as 
> well. In order to avoid wasting developer's time for testing performance 
> against this poor setup, we will remove the test suite as it is no longer 
> valuable to the community.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9793) Stream HandleAssignment should guarantee task close

2020-04-15 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9793.

Resolution: Fixed

> Stream HandleAssignment should guarantee task close
> ---
>
> Key: KAFKA-9793
> URL: https://issues.apache.org/jira/browse/KAFKA-9793
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> When triggering the `handleAssignment` call, if task preCommit throws, the 
> doom-to-fail task shall not be closed, thus causing a RocksDB metrics 
> recorder re-addition, which is fatal:
>  
>  
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,668] INFO 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle 
> new assignment with:
>         New active tasks: [1_0, 0_1, 2_0]
>         New standby tasks: []
>         Existing active tasks: [0_1, 1_0, 2_0, 3_1]
>         Existing standby tasks: [] 
> (org.apache.kafka.streams.processor.internals.TaskManager)
>  
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,671] INFO 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
> [3_1] Prepared clean close 
> (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,671] INFO 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
> [0_1] Prepared task for committing 
> (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,682] ERROR 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
> [1_0] Failed to flush state store logData10MinuteFinalCount-store:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
> sending record to topic windowed-node-counts for task 1_0 due to:
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) 
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
> operation with an old epoch. Either there is a newer producer with the same 
> transactionalId, or the producer's transaction has been expired by the broker.
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets 
> would not be recorded and no more records would be sent since the producer is 
> fenced, indicating the task may be migrated out; it means all tasks belonging 
> to this thread should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.lang.Thread.run(Thread.java:748)
>  
> The correct solution is to wrap the whole code block by try-catch to avoid 
> unexpected close failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9860) Transactional Producer could add partitions by batch at the end

2020-04-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9860:
--

 Summary: Transactional Producer could add partitions by batch at 
the end
 Key: KAFKA-9860
 URL: https://issues.apache.org/jira/browse/KAFKA-9860
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen


As of today, the Producer transaction manager bookkeeps the partitions involved 
with current transaction. Each time it sees a new partition, it will try to 
send a request to add all the involved partitions to the broker, which results 
in multiple requests. If we could batch the work by the end of the transaction, 
we save unnecessary round trips.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9784) Add OffsetFetch to the concurrent coordinator test

2020-04-09 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9784.

Resolution: Fixed

> Add OffsetFetch to the concurrent coordinator test
> --
>
> Key: KAFKA-9784
> URL: https://issues.apache.org/jira/browse/KAFKA-9784
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Normally consumers would first do an OffsetFetch before starting the normal 
> processing. It makes sense to add it to the concurrent test suite.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9809) Shrink transaction timeout for Streams

2020-04-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9809.

Resolution: Fixed

> Shrink transaction timeout for Streams
> --
>
> Key: KAFKA-9809
> URL: https://issues.apache.org/jira/browse/KAFKA-9809
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Per 447 documented, we need to shrink transaction timeout for Streams to 10 
> seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9809) Shrink transaction timeout for Streams

2020-04-02 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9809:
--

 Summary: Shrink transaction timeout for Streams
 Key: KAFKA-9809
 URL: https://issues.apache.org/jira/browse/KAFKA-9809
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


Per 447 documented, we need to shrink transaction timeout for Streams to 10 
seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9793) Stream HandleAssignment should guarantee task close

2020-03-31 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9793:
--

 Summary: Stream HandleAssignment should guarantee task close
 Key: KAFKA-9793
 URL: https://issues.apache.org/jira/browse/KAFKA-9793
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen


When triggering the `handleAssignment` call, if task preCommit throws, the 
doom-to-fail task shall not be closed, thus causing a RocksDB metrics recorder 
re-addition, which is fatal:

 

 

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
23:50:42,668] INFO 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
stream-thread 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle 
new assignment with:

        New active tasks: [1_0, 0_1, 2_0]

        New standby tasks: []

        Existing active tasks: [0_1, 1_0, 2_0, 3_1]

        Existing standby tasks: [] 
(org.apache.kafka.streams.processor.internals.TaskManager)

 

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
23:50:42,671] INFO 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
stream-thread 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
[3_1] Prepared clean close 
(org.apache.kafka.streams.processor.internals.StreamTask)

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
23:50:42,671] INFO 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
stream-thread 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
[0_1] Prepared task for committing 
(org.apache.kafka.streams.processor.internals.StreamTask)

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
23:50:42,682] ERROR 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
stream-thread 
[stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
[1_0] Failed to flush state store logData10MinuteFinalCount-store:  
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) 
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
sending record to topic windowed-node-counts for task 1_0 due to:

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) 
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
operation with an old epoch. Either there is a newer producer with the same 
transactionalId, or the producer's transaction has been expired by the broker.

[2020-03-31T16:50:43-07:00] 
(streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets 
would not be recorded and no more records would be sent since the producer is 
fenced, indicating the task may be migrated out; it means all tasks belonging 
to this thread should be migrated.

        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202)

        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)

        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352)

        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)

        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)

        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768)

        at 
org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485)

        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304)

        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)

        at java.lang.Thread.run(Thread.java:748)

 

The correct solution is to wrap the whole code block by try-catch to avoid 
unexpected close failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9784) Add OffsetFetch to the concurrent coordinator test

2020-03-28 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9784:
--

 Summary: Add OffsetFetch to the concurrent coordinator test
 Key: KAFKA-9784
 URL: https://issues.apache.org/jira/browse/KAFKA-9784
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen


Normally consumers would first do an OffsetFetch before starting the normal 
processing. It makes sense to add it to the concurrent test suite.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9779) Add version 2.5 to streams system tests

2020-03-27 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9779:
--

 Summary: Add version 2.5 to streams system tests
 Key: KAFKA-9779
 URL: https://issues.apache.org/jira/browse/KAFKA-9779
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9764) Deprecate Stream Simple benchmark

2020-03-25 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9764:
--

 Summary: Deprecate Stream Simple benchmark
 Key: KAFKA-9764
 URL: https://issues.apache.org/jira/browse/KAFKA-9764
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen


Over the years, we are seeing this simple benchmark suite to be less valuable 
over time. It is built on Jenkins infra which could not guarantee consistent 
result out of each run, and most times could not bring in any insights as well. 
In order to avoid wasting developer's time for testing performance against this 
poor setup, we will remove the test suite as it is no longer valuable to the 
community.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9760) Add EOS protocol changes to documentation

2020-03-24 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9760:
--

 Summary: Add EOS protocol changes to documentation
 Key: KAFKA-9760
 URL: https://issues.apache.org/jira/browse/KAFKA-9760
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9759) Add documentation change for KIP-562

2020-03-24 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9759:
--

 Summary: Add documentation change for KIP-562
 Key: KAFKA-9759
 URL: https://issues.apache.org/jira/browse/KAFKA-9759
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9758) Add documentations for Streams release

2020-03-24 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9758:
--

 Summary: Add documentations for Streams release
 Key: KAFKA-9758
 URL: https://issues.apache.org/jira/browse/KAFKA-9758
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.5.0
Reporter: Boyang Chen


This ticket tracks the doc updates for a couple of KIPs launched in stream 2.5,

including:

KIP-523

KIP-527

KIP-530

KIP-562



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9757) Add documentation change for KIP-535

2020-03-24 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9757:
--

 Summary: Add documentation change for KIP-535
 Key: KAFKA-9757
 URL: https://issues.apache.org/jira/browse/KAFKA-9757
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 2.5.0
Reporter: Boyang Chen
Assignee: Vinoth Chandar


Just a reminder to add documentations for KIP-535 in both the release notes and 
streams documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue

2020-03-24 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9475.

Resolution: Won't Fix

It seems not worth the effort as we could just shrink the scheduler time

> Replace transaction abortion scheduler with a delayed queue
> ---
>
> Key: KAFKA-9475
> URL: https://issues.apache.org/jira/browse/KAFKA-9475
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> Although we could try setting the txn timeout to be 10 second, the purging 
> scheduler only works every one minute interval, so in the worst case we shall 
> still wait for 1 minute. We are considering several potential fixes:
>  # Change interval to 10 seconds: means we will have 6X frequent checking, 
> more read contention on txn metadata. The benefit here is an easy one-line 
> fix without correctness concern
>  # Use an existing delayed queue, a.k.a purgatory. From what I heard, the 
> purgatory needs at least 2 extra threads to work properly, with some add-on 
> overhead for memory and complexity. The benefit here is more precise timeout 
> reaction, without a redundant full metadata read lock.
>  # Create a new delayed queue. This could be done by using scala delayed 
> queue, the concern here is that whether this approach is production ready. 
> Benefits are the same as 2, with less code complexity potentially
> This ticket is to track #2 progress if we decide to go through this path 
> eventually.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9743) StreamTask could fail to close during HandleNewAssignment

2020-03-24 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9743.

Resolution: Fixed

> StreamTask could fail to close during HandleNewAssignment
> -
>
> Key: KAFKA-9743
> URL: https://issues.apache.org/jira/browse/KAFKA-9743
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We found this particular bug from happening in soak:
> [2020-03-20T16:12:02-07:00] 
> (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) [2020-03-20 
> 23:12:01,534] ERROR 
> [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] 
> stream-thread 
> [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-03-20T16:12:02-07:00] 
> (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) 
> java.lang.IllegalStateException: RocksDB metrics recorder for store 
> "KSTREAM-AGGREGATE-STATE-STORE-40" of task 2_2 has already been 
> added. This is a bug in Kafka Streams.
>         at 
> org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)
>         at 
> org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:98)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:207)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:193)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:231)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:81)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:191)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:329)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:587)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:501)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:475)
>  
> Which could bring the entire instance down. The bug was that if we fail to do 
> the commit during task close section, the actual `closeClean` call could not 
> be triggered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8230) Add static membership support in librd consumer client

2020-03-23 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8230.

Resolution: Fixed

Will be tracked on the librdKafka side, which seems to be done

> Add static membership support in librd consumer client 
> ---
>
> Key: KAFKA-8230
> URL: https://issues.apache.org/jira/browse/KAFKA-8230
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: consumer
>
> Once the effort in https://issues.apache.org/jira/browse/KAFKA-7018 is done, 
> one of the low hanging fruit is to add this support for other language Kafka 
> consumers, such as c consumer in librdKafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9751) Admin `FindCoordinator` call should go to controller instead of ZK

2020-03-23 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9751:
--

 Summary: Admin `FindCoordinator` call should go to controller 
instead of ZK
 Key: KAFKA-9751
 URL: https://issues.apache.org/jira/browse/KAFKA-9751
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In current trunk, we are still going to use ZK for topic creation in the 
routing of FindCoordinatorRequest:
 val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
        case CoordinatorType.GROUP =>
          val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
          (partition, metadata)
Which should be migrated to controller handling



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9743) StreamTask could fail to close during HandleNewAssignment

2020-03-22 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9743:
--

 Summary: StreamTask could fail to close during HandleNewAssignment
 Key: KAFKA-9743
 URL: https://issues.apache.org/jira/browse/KAFKA-9743
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


We found this particular bug from happening in soak:

[2020-03-20T16:12:02-07:00] 
(streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) [2020-03-20 
23:12:01,534] ERROR 
[stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] 
stream-thread 
[stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)

[2020-03-20T16:12:02-07:00] 
(streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) 
java.lang.IllegalStateException: RocksDB metrics recorder for store 
"KSTREAM-AGGREGATE-STATE-STORE-40" of task 2_2 has already been added. 
This is a bug in Kafka Streams.

        at 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)

        at 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:98)

        at 
org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:207)

        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:193)

        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:231)

        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)

        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)

        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)

        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)

        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)

        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)

        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)

        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)

        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:81)

        at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:191)

        at 
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:329)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:587)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:501)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:475)

 

Which could bring the entire instance down. The bug was that if we fail to do 
the commit during task close section, the actual `closeClean` call could not be 
triggered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   >