[jira] [Updated] (KAFKA-9098) Name Repartition Filter, Source, and Sink Processors
[ https://issues.apache.org/jira/browse/KAFKA-9098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9098: --- Issue Type: Improvement (was: Bug) > Name Repartition Filter, Source, and Sink Processors > > > Key: KAFKA-9098 > URL: https://issues.apache.org/jira/browse/KAFKA-9098 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.2.0, 2.3.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > When users provide a name for repartition topics, we should the same name as > the base for the filter, source and sink operators. While this does not > break a topology, users providing names for all processors in a DSL topology > may find the generated names for the repartition topics filter, source, and > sink operators as inconsistent with the naming approach. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state
[ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959417#comment-16959417 ] ASF GitHub Bot commented on KAFKA-8972: --- guozhangwang commented on pull request #7589: KAFKA-8972: need to flush state even on unclean close URL: https://github.com/apache/kafka/pull/7589 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback > state > > > Key: KAFKA-8972 > URL: https://issues.apache.org/jira/browse/KAFKA-8972 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Fix For: 2.4.0 > > > Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the > following: > {code} > this.subscriptions.unsubscribe(); > this.coordinator.onLeavePrepare(); > this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); > {code} > And inside {{onLeavePrepare}} we would look into the assignment and try to > revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, > and then clear the assignment. > However, the subscription's assignment is already cleared in > {{this.subscriptions.unsubscribe();}} which means user's rebalance listener > would never be triggered. In other words, from consumer client's pov nothing > is owned after unsubscribe, but from the user caller's pov the partitions are > not revoked yet. For callers like Kafka Streams which rely on the rebalance > listener to maintain their internal state, this leads to inconsistent state > management and failure cases. > Before KIP-429 this issue is hidden away since every time the consumer > re-joins the group later, it would still revoke everything anyways regardless > of the passed-in parameters of the rebalance listener; with KIP-429 this is > easier to reproduce now. > I think we can summarize our fix as: > • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then > `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks > are all closed as revoked by then. > • [Optimization] If the generation is reset due to fatal error from join / hb > response etc, then we know that all partitions are lost, and we should not > trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside > `onLeavePrepare`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9103) KIP-441: Add TaskLags to SubscriptionInfo
John Roesler created KAFKA-9103: --- Summary: KIP-441: Add TaskLags to SubscriptionInfo Key: KAFKA-9103 URL: https://issues.apache.org/jira/browse/KAFKA-9103 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler As described in KIP-441, we will add the TaskLags field to, and remove the ActiveTasks and StandbyTasks fields from, the SubscriptionInfo object. This change can be made independent of the new balancing algorithm, since we can transparently convert back and forth between active/standby tasks and the tasklags formats, using a lag of 0 to denote active tasks and a lag > 0 to denote standbys. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8729) Augment ProduceResponse error messaging for specific culprit records
[ https://issues.apache.org/jira/browse/KAFKA-8729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959376#comment-16959376 ] ASF GitHub Bot commented on KAFKA-8729: --- guozhangwang commented on pull request #7522: KAFKA-8729: Add upgrade docs for KIP-467 on augmented produce response URL: https://github.com/apache/kafka/pull/7522 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Augment ProduceResponse error messaging for specific culprit records > > > Key: KAFKA-8729 > URL: https://issues.apache.org/jira/browse/KAFKA-8729 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Guozhang Wang >Assignee: Tu Tran >Priority: Major > > 1. We should replace the misleading CORRUPT_RECORD error code with a new > INVALID_RECORD. > 2. We should augment the ProduceResponse with customizable error message and > indicators of culprit records. > 3. We should change the client-side handling logic of non-retriable > INVALID_RECORD to re-batch the records. > Details see: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9089) Reassignment should be resilient to unexpected errors
[ https://issues.apache.org/jira/browse/KAFKA-9089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-9089. Resolution: Fixed > Reassignment should be resilient to unexpected errors > - > > Key: KAFKA-9089 > URL: https://issues.apache.org/jira/browse/KAFKA-9089 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.4.0 > > > Reassignment changes typically involve both an update to the assignment state > in zookeeper and an update to the in-memory representation of that state (in > the ControllerContext). We can run into trouble when these states get > inconsistent with each other, so the reassignment logic attempts to follow > some rules to reduce the impact from this: > * When creating a new reassignment, we update the state in zookeeper first > before updating memory. Until the reassignment is known to be persisted, we > do not begin executing any reassignment logic. > * When completing a reassignment, all of the completion steps are executed > before the state is updated in zookeeper. In the event of a failure, the new > controller can retry reassignment completion. > However, the current logic does not follow these rules strictly which can > lead to state inconsistencies in the case of an unexpected error. > # When we override or cancel an existing assignment, we currently use an > intermediate assignment state which is only reflected in memory. It is > basically a mix of the previous assignment state and the overlapping parts of > the new reassignment. The purpose of this is to shutdown unneeded replicas > from the existing reassignment. Since the intermediate state is not > persisted, a controller failure will revert to the old reassignment. Any > exception which does not cause a controller failure will result in state > divergence. > # The target replicas of a reassignment are represented both in the existing > assignment (PartitionReplicaAssignment) and in a separate context object > (ReassignedPartitionContext). The reassignment context is updated before a > reassignment has been accepted and persisted. The intent is to remove this > context object in the event of a submission failure, but an unexpected error > will leave it around. > We can make reassignment more resilient to unexpected errors by using > consistent update invariants. Specifically we can remove the intermediate > assignment state and enforce the invariant that any active reassignment must > be persisted before being reflected in memory. Additionally, we can make the > assignment state the source of truth for the target replicas and eliminate > the possibility of inconsistency. Doing so simplifies the reassignment logic > and makes it more resilient. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9089) Reassignment should be resilient to unexpected errors
[ https://issues.apache.org/jira/browse/KAFKA-9089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959373#comment-16959373 ] ASF GitHub Bot commented on KAFKA-9089: --- hachikuji commented on pull request #7562: KAFKA-9089; Reassignment should be resilient to unexpected errors URL: https://github.com/apache/kafka/pull/7562 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reassignment should be resilient to unexpected errors > - > > Key: KAFKA-9089 > URL: https://issues.apache.org/jira/browse/KAFKA-9089 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.4.0 > > > Reassignment changes typically involve both an update to the assignment state > in zookeeper and an update to the in-memory representation of that state (in > the ControllerContext). We can run into trouble when these states get > inconsistent with each other, so the reassignment logic attempts to follow > some rules to reduce the impact from this: > * When creating a new reassignment, we update the state in zookeeper first > before updating memory. Until the reassignment is known to be persisted, we > do not begin executing any reassignment logic. > * When completing a reassignment, all of the completion steps are executed > before the state is updated in zookeeper. In the event of a failure, the new > controller can retry reassignment completion. > However, the current logic does not follow these rules strictly which can > lead to state inconsistencies in the case of an unexpected error. > # When we override or cancel an existing assignment, we currently use an > intermediate assignment state which is only reflected in memory. It is > basically a mix of the previous assignment state and the overlapping parts of > the new reassignment. The purpose of this is to shutdown unneeded replicas > from the existing reassignment. Since the intermediate state is not > persisted, a controller failure will revert to the old reassignment. Any > exception which does not cause a controller failure will result in state > divergence. > # The target replicas of a reassignment are represented both in the existing > assignment (PartitionReplicaAssignment) and in a separate context object > (ReassignedPartitionContext). The reassignment context is updated before a > reassignment has been accepted and persisted. The intent is to remove this > context object in the event of a submission failure, but an unexpected error > will leave it around. > We can make reassignment more resilient to unexpected errors by using > consistent update invariants. Specifically we can remove the intermediate > assignment state and enforce the invariant that any active reassignment must > be persisted before being reflected in memory. Additionally, we can make the > assignment state the source of truth for the target replicas and eliminate > the possibility of inconsistency. Doing so simplifies the reassignment logic > and makes it more resilient. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9102) Increase default zk session timeout and max lag
[ https://issues.apache.org/jira/browse/KAFKA-9102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959346#comment-16959346 ] ASF GitHub Bot commented on KAFKA-9102: --- hachikuji commented on pull request #7596: KAFKA-9102; Increase default zk session timeout and replica max lag [KIP-537] URL: https://github.com/apache/kafka/pull/7596 This patch increases the default value of `zookeeper.session.timeout` from 6s to 18s and `replica.lag.time.max.ms` from 10s to 30s. This change was documented in KIP-537: https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Increase default zk session timeout and max lag > --- > > Key: KAFKA-9102 > URL: https://issues.apache.org/jira/browse/KAFKA-9102 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.5.0 > > > This tracks the implementation of KIP-537: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9102) Increase default zk session timeout and max lag
Jason Gustafson created KAFKA-9102: -- Summary: Increase default zk session timeout and max lag Key: KAFKA-9102 URL: https://issues.apache.org/jira/browse/KAFKA-9102 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 2.5.0 This tracks the implementation of KIP-537: https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9101) Create a fetch.max.bytes configuration for the broker
[ https://issues.apache.org/jira/browse/KAFKA-9101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-9101: --- Assignee: Colin McCabe > Create a fetch.max.bytes configuration for the broker > - > > Key: KAFKA-9101 > URL: https://issues.apache.org/jira/browse/KAFKA-9101 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9101) Create a fetch.max.bytes configuration for the broker
Colin McCabe created KAFKA-9101: --- Summary: Create a fetch.max.bytes configuration for the broker Key: KAFKA-9101 URL: https://issues.apache.org/jira/browse/KAFKA-9101 Project: Kafka Issue Type: Improvement Components: core Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9100) The controller code allows for replicas to get added to the assignment when the replicas goes online
Jose Armando Garcia Sancio created KAFKA-9100: - Summary: The controller code allows for replicas to get added to the assignment when the replicas goes online Key: KAFKA-9100 URL: https://issues.apache.org/jira/browse/KAFKA-9100 Project: Kafka Issue Type: Task Components: controller Affects Versions: 2.3.0 Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio The controller code when modelling the replicas state machine allows for a replica to be added to assignment when going online from a new state. This should not be possible. Replicas are added to a partition when creating a topic, creating a partition or reassigning a partition. Whether a replica goes online should not cause a replica to get added to the assignment. The code that currently allows this is here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L194-L198 We should restructure the code so that that condition is not possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959308#comment-16959308 ] Richard Yu commented on KAFKA-8522: --- Found where the tombstones are inserted. That really just answered my question above. My bad. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959306#comment-16959306 ] Sophie Blee-Goldman commented on KAFKA-4969: Bill's proposal has already been merged, so you should see a better distribution of tasks of the same subtopology/topicGroupId as long as you've upgraded to a version containing this fix (not sure what version this first went in to, any idea [~bbejeck]?) This ticket was reopened to cover "true" state-aware assignment that distinguishes between heavier stateful subtopologies and lighter stateless ones, which should be covered by the work being planned as part of KIP-441 – if you're interested you can read up on it here [https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams] > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > - in the case of standby tasks, which tasks have progressed the most with > respect to restoration > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. > > There have been some additional discussions around task assignment on a > related PR https://github.com/apache/kafka/pull/5390 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9099) Reassignments should be retried after unexpected errors
Jason Gustafson created KAFKA-9099: -- Summary: Reassignments should be retried after unexpected errors Key: KAFKA-9099 URL: https://issues.apache.org/jira/browse/KAFKA-9099 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Stanislav Kozlovski Currently we do not catch unexpected exceptions in `onPartitionReassignment` in the controller. Generally this can lead to state inconsistencies and a failure to complete a reassignment. We should add exception handling and come up with a retry approach. For example, we could track failed reassignments in the controller context and retry them periodically. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6453) Reconsider timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959246#comment-16959246 ] Boyang Chen commented on KAFKA-6453: Executor needs to first check in change into the AK repo, and then merge it into the kafka-site > Reconsider timestamp propagation semantics > -- > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Victoria Bialas >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6453) Reconsider timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959244#comment-16959244 ] Boyang Chen commented on KAFKA-6453: Need to have both state store section and DSL section to reflect changes, such as the reasoning of adding timestamp to state store for better: # join semantic, previously we don't have timestamp in the state store, so out-of-order join could not be caught. Now we could take max # aggregation semantic, the out of order agg is also possible to push the result backward. With timestamp, it's more defensive. Window store and timestamped kv store are also recommended to be put in the state store section. > Reconsider timestamp propagation semantics > -- > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Victoria Bialas >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9098) Name Repartition Filter, Source, and Sink Processors
Bill Bejeck created KAFKA-9098: -- Summary: Name Repartition Filter, Source, and Sink Processors Key: KAFKA-9098 URL: https://issues.apache.org/jira/browse/KAFKA-9098 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0, 2.2.0 Reporter: Bill Bejeck Assignee: Bill Bejeck When users provide a name for repartition topics, we should the same name as the base for the filter, source and sink operators. While this does not break a topology, users providing names for all processors in a DSL topology may find the generated names for the repartition topics filter, source, and sink operators as inconsistent with the naming approach. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9097) URL needs update - use https://kafka.apache.org/quickstart
Bernard A. Badger created KAFKA-9097: Summary: URL needs update - use https://kafka.apache.org/quickstart Key: KAFKA-9097 URL: https://issues.apache.org/jira/browse/KAFKA-9097 Project: Kafka Issue Type: Bug Components: documentation Reporter: Bernard A. Badger Use better URL for instructions in README.md, https://kafka.apache.org/quickstart {noformat} $ git diff HEAD^ diff --git a/README.md b/README.md index 308d9df8c..7095c161f 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Now everything else will work. ### Build a jar and run it ### ./gradlew jar -Follow instructions in https://kafka.apache.org/documentation.html#quickstart +Follow instructions in https://kafka.apache.org/quickstart ### Build source jar ### ./gradlew srcJar {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959188#comment-16959188 ] ASF GitHub Bot commented on KAFKA-9088: --- pierDipi commented on pull request #7594: KAFKA-9088: Consolidate InternalMockProcessorContext and MockInternal… URL: https://github.com/apache/kafka/pull/7594 …ProcessorContext Merge `InternalMockProcessorContext` into `MockInternalProcessorContext` and replace all `InternalMockProcessorContext` usages by `MockInternalProcessorContext`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consolidate InternalMockProcessorContext and MockInternalProcessorContext > - > > Key: KAFKA-9088 > URL: https://issues.apache.org/jira/browse/KAFKA-9088 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Bruno Cadonna >Priority: Minor > Labels: newbie > > Currently, we have two mocks for the {{InternalProcessorContext}}. The goal > of this ticket is to merge both into one mock or replace it with an > {{EasyMock}} mock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9083) Various parsing issues in Values class
[ https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959174#comment-16959174 ] ASF GitHub Bot commented on KAFKA-9083: --- C0urante commented on pull request #7593: KAFKA-9083: Various fixes/improvements for Connect's Values class URL: https://github.com/apache/kafka/pull/7593 [Jira](https://issues.apache.org/jira/browse/KAFKA-9083) The following functional changes are implemented here: • Top-level strings beginning with `"true"`, `"false"` and then followed by token delimiters (e.g., `"true}"` and `"false]"`) are parsed as strings, not as booleans • The empty array (`"[]"`) is now parsed as an array with a null value schema • The empty map (`"{}"`) is now parsed as a map with null key and value schemas • Arrays with all-null elements are now parsed successfully (whereas before an NPE was thrown) as an array with a null value schema • Maps with all-null values are now parsed as maps with null value schemas, but non-null key schemas • Strings that appear to be arrays at first but are missing comma delimiters (e.g., `"[0 1 2]"`) are now parsed as strings instead of arrays • A small improvement is made to the debug message generated when map parsing fails due to unexpected comma delimiters ("Unable to parse a map entry has no key or value" is changed to "Unable to parse a map entry with no key or value") • A small improvement is made to the debug message generated when map parsing fails due to a missing `}` ("Map is missing terminating ']'" is changed to "Map is missing terminating '}'") • A small improvement is made to the debug message generated when array or map parsing fails and parsing is reset to process the input as a string ("Unable to parse the value as a map" is changed to "Unable to parse the value as a map or an array") • Embedded values that lack surrounding quotes (e.g., `foo` in `"[foo]"`) are no longer treated as strings; this is in line with the JSON-like representation that is meant to be supported by the `Values` class and prevents errors such as parsing `"[0 1 2]"` as an array containing a single string element with a value of `"0 1 2"` • The top-level string `"null"` is now parsed as `null` instead of the string `"null"`; this does not break attempts to convert the top-level string `"null"` into a string (which should also be the string `"null"`) Every change (except for log message alterations) is verified with one or more unit tests, and several unit tests are also added to prevent regression in functionality that, while not currently broken, is subtle enough that it may be missed in future changes without tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Various parsing issues in Values class > -- > > Key: KAFKA-9083 > URL: https://issues.apache.org/jira/browse/KAFKA-9083 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > There are a number of small issues with the Connect framework's {{Values}} > class that lead to either unexpected exceptions, unintuitive (and arguably > incorrect) behavior, or confusing log messages. These include: > * A {{NullPointerException}} is thrown when parsing the string {{[null]}} > (which should be parsed as an array containing a single null element) > * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which > should be parsed as an empty array) > * The returned schema is null when parsing the string {{{}}} (instead, it > should be a map schema, possibly with null key and value schemas) > * Strings that begin with what appear to be booleans (i.e., the literals > {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, > {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be > parsed as strings; for example, the string {{true}}} is parsed as the boolean > {{true}} but should probably just be parsed as the string {{true}}} > * Arrays not containing commas are still parsed as arrays in some cases; for > example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it > should arguably be parsed as the string literal {{[0 1 2]}} > * An exception is logged when attempting to parse input as a map when it > doesn't contain
[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959169#comment-16959169 ] Dallas Wrege commented on KAFKA-4969: - Bump. I would love to see this issue back in consideration. We had exactly the same experience as [~mjsax], and a simple search of this Jira brought me straight here. Because of re-keying, our topology is broken up into one "heavy" task and three "stateless" tasks. I ended up in situations where some clients are assigned only heavy tasks and others only stateless tasks - all were unbalanced at the least. We worked around the problem by splitting the stateless parts of the topology into separate Stream Processors, but using a workaround is unfortunate. The straightforward implementation that [~bbejeck] proposes above would have worked fine for us - ensuring an even distribution based on topicGroupId. > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > - in the case of standby tasks, which tasks have progressed the most with > respect to restoration > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. > > There have been some additional discussions around task assignment on a > related PR https://github.com/apache/kafka/pull/5390 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7181) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959150#comment-16959150 ] Matthias J. Sax commented on KAFKA-7181: As you can see on the ticket, it's not assigned yet. And it's unclear what the root cause is. We need to further investigate, especially, as it seems to be hard to reproduce. If you have any new finding, please, comment on KAFKA-9073. > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters IllegalStateException > --- > > Key: KAFKA-7181 > URL: https://issues.apache.org/jira/browse/KAFKA-7181 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Romil Kumar Vasani >Priority: Major > Fix For: 2.2.0 > > > One the StreamThread encounters an IllegalStateException and is marked DEAD, > shut down. > The application doesn't spawn a new thread in it's place, the partitions of > that thread are assigned to a different thread and it synchronizes. But the > application is stuck in REBALANCING state, as not all StreamThreads are in > RUNNING state. > Excepted: New thread should come up and after synchronization/rebalancing it > the KafkaStream.State should be RUNNING > Since all the active threads (that are not marked DEAD) are in RUNNING state, > the KafkaStreams.State should be RUNNING > P.S. I am reporting an issue for the first time. If there is more information > needed I can provide. > Below are the logs from the IllegalStateException: > 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error > during processing: > java.lang.IllegalStateException: No current assignment for partition > consumerGroup-stateStore-changelog-10 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from RUNNING to > PENDING_SHUTDOWN > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutting down > 2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] > o.a.k.clients.producer.KafkaProducer : [Producer > clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms. > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread >
[jira] [Updated] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id
[ https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-9093: Fix Version/s: (was: 2.3.1) 2.3.2 > NullPointerException in KafkaConsumer with group.instance.id > > > Key: KAFKA-9093 > URL: https://issues.apache.org/jira/browse/KAFKA-9093 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: Rolef Heinrich >Assignee: huxihx >Priority: Minor > Fix For: 2.3.2 > > > When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor > throws a NullpointerException in close(): > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > {code} > {{It turns out that the exception is thrown because the *log* member is not > yet initialized (still null) in the constructor when the original exception > is handled. The original exception is thrown before *log* is initailized.}} > {{The side effect of this error is, that close does does not cleanup > resources as clean is supposed to do.}} > *{{The used consumer properties for reference:}}* > > {code:java} > key.deserializer=com.ibm.streamsx.kafka.serialization > request.timeout.ms=25000 > value.deserializer=com.ibm.streamsx.kafka.serialization > client.dns.lookup=use_all_dns_ips > metadata.max.age.ms=2000 > enable.auto.commit=false > group.instance.id=myUniqId[0] > max.poll.interval.ms=30 > group.id=consumer-0 > metric.reporters=com.ibm.streamsx.kafka.clients.consum... > reconnect.backoff.max.ms=1 > bootstrap.servers=localhost:9092 > max.poll.records=50 > session.timeout.ms=2 > client.id=C-J37-ReceivedMessages[0] > allow.auto.create.topics=false > metrics.sample.window.ms=1 > retry.backoff.ms=500 > reconnect.backoff.ms=250{code} > *Expected behaviour:* throw exception indicating that something is wrong with > the chosen group.instance.id. > The documentation does not tell anything about valid values for > group.instance.id. > *Reproduce:* > > > {code:java} > > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.KafkaConsumer; > public class Main { > public static void main(String[] args) { > Properties props = new Properties(); > props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1"); > props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]"); > props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > KafkaConsumer c = new KafkaConsumer (props); > } > } > Exception in thread "main" java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > at Main.main(Main.java:15) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5722) Refactor ConfigCommand to use the AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958988#comment-16958988 ] Gokul Ramanan Subramanian commented on KAFKA-5722: -- [~viktorsomogyi] are you actively working on this? If so, could you share your progress please? Thanks. > Refactor ConfigCommand to use the AdminClient > - > > Key: KAFKA-5722 > URL: https://issues.apache.org/jira/browse/KAFKA-5722 > Project: Kafka > Issue Type: Improvement >Reporter: Viktor Somogyi-Vass >Assignee: Viktor Somogyi-Vass >Priority: Major > Labels: kip, needs-kip > Fix For: 2.5.0 > > > The ConfigCommand currently uses a direct connection to zookeeper. The > zookeeper dependency should be deprecated and an AdminClient API created to > be used instead. > This change requires a KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5561) Java based TopicCommand to use the Admin client
[ https://issues.apache.org/jira/browse/KAFKA-5561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958985#comment-16958985 ] Gokul Ramanan Subramanian commented on KAFKA-5561: -- [~ppatierno] are you actively working on this? If so, could you share your progress please? Thanks. > Java based TopicCommand to use the Admin client > --- > > Key: KAFKA-5561 > URL: https://issues.apache.org/jira/browse/KAFKA-5561 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Hi, > as suggested in the https://issues.apache.org/jira/browse/KAFKA-3331, it > could be great to have the TopicCommand using the new Admin client instead of > the way it works today. > As pushed by [~gwenshap] in the above JIRA, I'm going to work on it. > Thanks, > Paolo -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9096) Validate config inter-dependencies for KafkaConfig only after dynamic configs are loaded
Rajini Sivaram created KAFKA-9096: - Summary: Validate config inter-dependencies for KafkaConfig only after dynamic configs are loaded Key: KAFKA-9096 URL: https://issues.apache.org/jira/browse/KAFKA-9096 Project: Kafka Issue Type: Improvement Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.5.0 We currently perform all validations in KafkaConfig when creating the config instance. This includes some checks that include multiple configs, e.g. is the inter-broker listener also present in the listener map? With dynamic configs, some configs come from the static properties file and others come from ZooKeeper. At the moment, we have a requirement for the static properties file to be consistent on its own, even though the config we actually use for the broker is the one where we have overlayed the configs from ZK on top of the static configs. We should continue to validate individual configs for the static configs file, but delay validating inter-dependencies until dynamic configs have also been processed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9092) Kafka Streams Upgrade Magic v0 does not support record headers
[ https://issues.apache.org/jira/browse/KAFKA-9092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958952#comment-16958952 ] Guozhang Wang commented on KAFKA-9092: -- [~wxmimperio] I think you are hitting KAFKA-6739 indeed. As explained in https://github.com/apache/kafka/pull/4813/files, there are some produce request with newer (v2 or v3) versions that contains headers, and when down-converting to v1 the headers should be ignored. I suggest you upgrade your brokers to 1.0.2: https://archive.apache.org/dist/kafka/1.0.2/RELEASE_NOTES.html > Kafka Streams Upgrade Magic v0 does not support record headers > -- > > Key: KAFKA-9092 > URL: https://issues.apache.org/jira/browse/KAFKA-9092 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 2.0.0 > Environment: kafka 1.0.0 > kafka streams lib 2.0.0 >Reporter: wxmimperio >Priority: Major > > My kafka cluster version: 1.0.0 and run a streams app to topic v1. > > Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis. > Reference Upgrade doc: > [http://kafka.apache.org/20/documentation/streams/upgrade-guide] > > {code:java} > > org.apache.kafka > kafka-streams > 2.0.0 > {code} > > {code:java} > // streams config > settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");{code} > > Then stop streams app and rebuild a new jar to start. > It’s no problem just starting to run. After a few hours kafka broker logs > error and I delete topic recreate a few hours kafka broker get same error: > {code:java} > [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} > (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error > when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} > (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does > not support record headers at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) > at scala.Option.map(Option.scala:146) at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) > at scala.Option.flatMap(Option.scala:171) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) > at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175) > at >
[jira] [Commented] (KAFKA-7181) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958940#comment-16958940 ] amuthan Ganeshan commented on KAFKA-7181: - Thank you [~mjsax] for responding, is KAFKA-9073 assigned to somebody already? Still, I couldn't able to come up with the root cause and also I not able to reproduce it at will. > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters IllegalStateException > --- > > Key: KAFKA-7181 > URL: https://issues.apache.org/jira/browse/KAFKA-7181 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Romil Kumar Vasani >Priority: Major > Fix For: 2.2.0 > > > One the StreamThread encounters an IllegalStateException and is marked DEAD, > shut down. > The application doesn't spawn a new thread in it's place, the partitions of > that thread are assigned to a different thread and it synchronizes. But the > application is stuck in REBALANCING state, as not all StreamThreads are in > RUNNING state. > Excepted: New thread should come up and after synchronization/rebalancing it > the KafkaStream.State should be RUNNING > Since all the active threads (that are not marked DEAD) are in RUNNING state, > the KafkaStreams.State should be RUNNING > P.S. I am reporting an issue for the first time. If there is more information > needed I can provide. > Below are the logs from the IllegalStateException: > 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error > during processing: > java.lang.IllegalStateException: No current assignment for partition > consumerGroup-stateStore-changelog-10 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from RUNNING to > PENDING_SHUTDOWN > 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutting down > 2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] > o.a.k.clients.producer.KafkaProducer : [Producer > clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms. > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD > 2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread > [consumerGroup-StreamThread-2] Shutdown complete >
[jira] [Created] (KAFKA-9095) Kafka Cluster Issue when API Version =1
Mantesh Janagond created KAFKA-9095: --- Summary: Kafka Cluster Issue when API Version =1 Key: KAFKA-9095 URL: https://issues.apache.org/jira/browse/KAFKA-9095 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.0.0 Reporter: Mantesh Janagond When I am using Kafka cluster with 3 nodes and Kafka API version = 1 I am getting the following error code = 16 {{The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.}} But the same setup is working for Kafka API Version = 0. So, please suggests to me how to solve this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9094) Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode
[ https://issues.apache.org/jira/browse/KAFKA-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958784#comment-16958784 ] ASF GitHub Bot commented on KAFKA-9094: --- stanislavkozlovski commented on pull request #7591: KAFKA-9094: Add server-side replica validation for ZK triggered reassignments URL: https://github.com/apache/kafka/pull/7591 This patch adds the same server-side validation we use for API triggered reassignments to the ZK path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Validate the replicas for partition reassignments triggered through the > /admin/reassign_partitions zNode > > > Key: KAFKA-9094 > URL: https://issues.apache.org/jira/browse/KAFKA-9094 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > As was mentioned by [~jsancio] in > [https://github.com/apache/kafka/pull/7574#discussion_r337621762] , it would > make sense to apply the same replica validation we apply to the KIP-455 > reassignments API. > Namely, validate that the replicas: > * are not empty (e.g []) > * are not negative negative (e.g [1,2,-1]) > * are not brokers that are not part of the cluster or otherwise unhealthy > (e.g not in /brokers zNode) > The last liveness validation is subject to comments. We are re-evaluating > whether we want to enforce it for the API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9094) Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode
Stanislav Kozlovski created KAFKA-9094: -- Summary: Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode Key: KAFKA-9094 URL: https://issues.apache.org/jira/browse/KAFKA-9094 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski As was mentioned by [~jsancio] in [https://github.com/apache/kafka/pull/7574#discussion_r337621762] , it would make sense to apply the same replica validation we apply to the KIP-455 reassignments API. Namely, validate that the replicas: * are not empty (e.g []) * are not negative negative (e.g [1,2,-1]) * are not brokers that are not part of the cluster or otherwise unhealthy (e.g not in /brokers zNode) The last liveness validation is subject to comments. We are re-evaluating whether we want to enforce it for the API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id
[ https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-9093: - Assignee: huxihx > NullPointerException in KafkaConsumer with group.instance.id > > > Key: KAFKA-9093 > URL: https://issues.apache.org/jira/browse/KAFKA-9093 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: Rolef Heinrich >Assignee: huxihx >Priority: Minor > Fix For: 2.3.1 > > > When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor > throws a NullpointerException in close(): > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > {code} > {{It turns out that the exception is thrown because the *log* member is not > yet initialized (still null) in the constructor when the original exception > is handled. The original exception is thrown before *log* is initailized.}} > {{The side effect of this error is, that close does does not cleanup > resources as clean is supposed to do.}} > *{{The used consumer properties for reference:}}* > > {code:java} > key.deserializer=com.ibm.streamsx.kafka.serialization > request.timeout.ms=25000 > value.deserializer=com.ibm.streamsx.kafka.serialization > client.dns.lookup=use_all_dns_ips > metadata.max.age.ms=2000 > enable.auto.commit=false > group.instance.id=myUniqId[0] > max.poll.interval.ms=30 > group.id=consumer-0 > metric.reporters=com.ibm.streamsx.kafka.clients.consum... > reconnect.backoff.max.ms=1 > bootstrap.servers=localhost:9092 > max.poll.records=50 > session.timeout.ms=2 > client.id=C-J37-ReceivedMessages[0] > allow.auto.create.topics=false > metrics.sample.window.ms=1 > retry.backoff.ms=500 > reconnect.backoff.ms=250{code} > *Expected behaviour:* throw exception indicating that something is wrong with > the chosen group.instance.id. > The documentation does not tell anything about valid values for > group.instance.id. > *Reproduce:* > > > {code:java} > > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.KafkaConsumer; > public class Main { > public static void main(String[] args) { > Properties props = new Properties(); > props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1"); > props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]"); > props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > KafkaConsumer c = new KafkaConsumer (props); > } > } > Exception in thread "main" java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > at Main.main(Main.java:15) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id
[ https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958721#comment-16958721 ] ASF GitHub Bot commented on KAFKA-9093: --- huxihx commented on pull request #7590: KAFKA-9093: NullPointerException in KafkaConsumer with group.instance.id URL: https://github.com/apache/kafka/pull/7590 https://issues.apache.org/jira/browse/KAFKA-9093 `log` in KafkaConsumer does not get initialized if an invalid value for `group.intance.id` is given during consumer construction. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NullPointerException in KafkaConsumer with group.instance.id > > > Key: KAFKA-9093 > URL: https://issues.apache.org/jira/browse/KAFKA-9093 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: Rolef Heinrich >Assignee: huxihx >Priority: Minor > Fix For: 2.3.1 > > > When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor > throws a NullpointerException in close(): > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > {code} > {{It turns out that the exception is thrown because the *log* member is not > yet initialized (still null) in the constructor when the original exception > is handled. The original exception is thrown before *log* is initailized.}} > {{The side effect of this error is, that close does does not cleanup > resources as clean is supposed to do.}} > *{{The used consumer properties for reference:}}* > > {code:java} > key.deserializer=com.ibm.streamsx.kafka.serialization > request.timeout.ms=25000 > value.deserializer=com.ibm.streamsx.kafka.serialization > client.dns.lookup=use_all_dns_ips > metadata.max.age.ms=2000 > enable.auto.commit=false > group.instance.id=myUniqId[0] > max.poll.interval.ms=30 > group.id=consumer-0 > metric.reporters=com.ibm.streamsx.kafka.clients.consum... > reconnect.backoff.max.ms=1 > bootstrap.servers=localhost:9092 > max.poll.records=50 > session.timeout.ms=2 > client.id=C-J37-ReceivedMessages[0] > allow.auto.create.topics=false > metrics.sample.window.ms=1 > retry.backoff.ms=500 > reconnect.backoff.ms=250{code} > *Expected behaviour:* throw exception indicating that something is wrong with > the chosen group.instance.id. > The documentation does not tell anything about valid values for > group.instance.id. > *Reproduce:* > > > {code:java} > > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.KafkaConsumer; > public class Main { > public static void main(String[] args) { > Properties props = new Properties(); > props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1"); > props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]"); > props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > KafkaConsumer c = new KafkaConsumer (props); > } > } > Exception in thread "main" java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at >
[jira] [Created] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id
Rolef Heinrich created KAFKA-9093: - Summary: NullPointerException in KafkaConsumer with group.instance.id Key: KAFKA-9093 URL: https://issues.apache.org/jira/browse/KAFKA-9093 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.3.0 Reporter: Rolef Heinrich Fix For: 2.3.1 When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor throws a NullpointerException in close(): {code:java} Caused by: java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) {code} {{It turns out that the exception is thrown because the *log* member is not yet initialized (still null) in the constructor when the original exception is handled. The original exception is thrown before *log* is initailized.}} {{The side effect of this error is, that close does does not cleanup resources as clean is supposed to do.}} *{{The used consumer properties for reference:}}* {code:java} key.deserializer=com.ibm.streamsx.kafka.serialization request.timeout.ms=25000 value.deserializer=com.ibm.streamsx.kafka.serialization client.dns.lookup=use_all_dns_ips metadata.max.age.ms=2000 enable.auto.commit=false group.instance.id=myUniqId[0] max.poll.interval.ms=30 group.id=consumer-0 metric.reporters=com.ibm.streamsx.kafka.clients.consum... reconnect.backoff.max.ms=1 bootstrap.servers=localhost:9092 max.poll.records=50 session.timeout.ms=2 client.id=C-J37-ReceivedMessages[0] allow.auto.create.topics=false metrics.sample.window.ms=1 retry.backoff.ms=500 reconnect.backoff.ms=250{code} *Expected behaviour:* throw exception indicating that something is wrong with the chosen group.instance.id. The documentation does not tell anything about valid values for group.instance.id. *Reproduce:* {code:java} import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Main { public static void main(String[] args) { Properties props = new Properties(); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1"); props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]"); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer c = new KafkaConsumer (props); } } Exception in thread "main" java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) at Main.main(Main.java:15) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9083) Various parsing issues in Values class
[ https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-9083: - Description: There are a number of small issues with the Connect framework's {{Values}} class that lead to either unexpected exceptions, unintuitive (and arguably incorrect) behavior, or confusing log messages. These include: * A {{NullPointerException}} is thrown when parsing the string {{[null]}} (which should be parsed as an array containing a single null element) * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which should be parsed as an empty array) * The returned schema is null when parsing the string {{{}}} (instead, it should be a map schema, possibly with null key and value schemas) * Strings that begin with what appear to be booleans (i.e., the literals {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be parsed as strings; for example, the string {{true}}} is parsed as the boolean {{true}} but should probably just be parsed as the string {{true}}} * Arrays not containing commas are still parsed as arrays in some cases; for example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it should arguably be parsed as the string literal {{[0 1 2]}} * An exception is logged when attempting to parse input as a map when it doesn't contain the a final closing brace that states "Map is missing terminating ']'" even though the expected terminating character is actually {{}} and not {{]}} * Finally, and possibly most contentious, escape sequences are not stripped from string literals. Thus, the input string {{foobar]}} is parsed as the literal string {{foobar]}}, which is somewhat unexpected, since that means it is impossible to pass in a string that is parsed as the string literal {{foobar]}}, and it is the job of the caller to handle stripping of such escape sequences. Given that the escape sequence can itself be escaped, it seems better to automatically strip it from parsed string literals before returning them to the user. was: There are a number of small issues with the Connect framework's {{Values}} class that lead to either unexpected exceptions, unintuitive (and arguably incorrect) behavior, or confusing log messages. These include: * A {{NullPointerException}} is thrown when parsing the string {{[null]}} (which should be parsed as an array containing a single null element) * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which should be parsed as an empty array) * The returned schema is null when parsing the string {{{}}} (instead, it should be a map schema, possibly with null key and value schemas) * Strings that begin with what appear to be booleans (i.e., the literals {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be parsed as strings; for example, the string {{true}}} is parsed as the boolean {{true}} but should probably just be parsed as the string {{true}}} * Arrays not containing commas are still parsed as arrays in some cases; for example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it should arguably be parsed as the string literal {{[0 1 2]}} * An exception is logged when attempting to parse input as a map when it doesn't contain the a final closing brace that states "Map is missing terminating ']'" even though the expecting terminating character is actually {{}} and not {{]}} * Finally, and possibly most contentious, escape sequences are not stripped from string literals. Thus, the input string {{foobar]}} is parsed as the literal string {{foobar]}}, which is somewhat unexpected, since that means it is impossible to pass in a string that is parsed as the string literal {{foobar]}}, and it is the job of the caller to handle stripping of such escape sequences. Given that the escape sequence can itself be escaped, it seems better to automatically strip it from parsed string literals before returning them to the user. > Various parsing issues in Values class > -- > > Key: KAFKA-9083 > URL: https://issues.apache.org/jira/browse/KAFKA-9083 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > There are a number of small issues with the Connect framework's {{Values}} > class that lead to either unexpected exceptions, unintuitive (and arguably > incorrect) behavior, or confusing log messages. These include: > * A {{NullPointerException}} is thrown when parsing the string {{[null]}} > (which should be parsed as an array containing a single null element) > * A
[jira] [Commented] (KAFKA-9092) Kafka Streams Upgrade Magic v0 does not support record headers
[ https://issues.apache.org/jira/browse/KAFKA-9092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958599#comment-16958599 ] wxmimperio commented on KAFKA-9092: --- [~mjsax] Hi, I think my cluster message format is support headers and Is newer than 0.11. {code:java} log.message.timestamp.type=LogAppendTime inter.broker.protocol.version=1.0 log.message.format.version=1.0 {code} > Kafka Streams Upgrade Magic v0 does not support record headers > -- > > Key: KAFKA-9092 > URL: https://issues.apache.org/jira/browse/KAFKA-9092 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 2.0.0 > Environment: kafka 1.0.0 > kafka streams lib 2.0.0 >Reporter: wxmimperio >Priority: Major > > My kafka cluster version: 1.0.0 and run a streams app to topic v1. > > Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis. > Reference Upgrade doc: > [http://kafka.apache.org/20/documentation/streams/upgrade-guide] > > {code:java} > > org.apache.kafka > kafka-streams > 2.0.0 > {code} > > {code:java} > // streams config > settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");{code} > > Then stop streams app and rebuild a new jar to start. > It’s no problem just starting to run. After a few hours kafka broker logs > error and I delete topic recreate a few hours kafka broker get same error: > {code:java} > [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} > (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error > when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} > (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does > not support record headers at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) > at scala.Option.map(Option.scala:146) at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) > at scala.Option.flatMap(Option.scala:171) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) > at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > at >
[jira] [Comment Edited] (KAFKA-9092) Kafka Streams Upgrade Magic v0 does not support record headers
[ https://issues.apache.org/jira/browse/KAFKA-9092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958599#comment-16958599 ] wxmimperio edited comment on KAFKA-9092 at 10/24/19 7:01 AM: - [~mjsax] Hi, I think my cluster message format is support headers and Is newer than 0.11. {code:java} // server.properties log.message.times.tamp.type=LogAppendTime inter.broker.protocol.version=1.0 log.message.format.version=1.0 {code} was (Author: wxmimperio): [~mjsax] Hi, I think my cluster message format is support headers and Is newer than 0.11. {code:java} log.message.timestamp.type=LogAppendTime inter.broker.protocol.version=1.0 log.message.format.version=1.0 {code} > Kafka Streams Upgrade Magic v0 does not support record headers > -- > > Key: KAFKA-9092 > URL: https://issues.apache.org/jira/browse/KAFKA-9092 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 2.0.0 > Environment: kafka 1.0.0 > kafka streams lib 2.0.0 >Reporter: wxmimperio >Priority: Major > > My kafka cluster version: 1.0.0 and run a streams app to topic v1. > > Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis. > Reference Upgrade doc: > [http://kafka.apache.org/20/documentation/streams/upgrade-guide] > > {code:java} > > org.apache.kafka > kafka-streams > 2.0.0 > {code} > > {code:java} > // streams config > settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");{code} > > Then stop streams app and rebuild a new jar to start. > It’s no problem just starting to run. After a few hours kafka broker logs > error and I delete topic recreate a few hours kafka broker get same error: > {code:java} > [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} > (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error > when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} > (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does > not support record headers at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) > at scala.Option.map(Option.scala:146) at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) > at scala.Option.flatMap(Option.scala:171) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) > at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) > at >