[jira] [Updated] (KAFKA-9098) Name Repartition Filter, Source, and Sink Processors

2019-10-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9098:
---
Issue Type: Improvement  (was: Bug)

> Name Repartition Filter, Source, and Sink Processors
> 
>
> Key: KAFKA-9098
> URL: https://issues.apache.org/jira/browse/KAFKA-9098
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> When users provide a name for repartition topics, we should the same name as 
> the base for the filter, source and sink operators.  While this does not 
> break a topology, users providing names for all processors in a DSL topology 
> may find the generated names for the repartition topics filter, source, and 
> sink operators as inconsistent with the naming approach.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959417#comment-16959417
 ] 

ASF GitHub Bot commented on KAFKA-8972:
---

guozhangwang commented on pull request #7589: KAFKA-8972: need to flush state 
even on unclean close
URL: https://github.com/apache/kafka/pull/7589
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> 
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9103) KIP-441: Add TaskLags to SubscriptionInfo

2019-10-24 Thread John Roesler (Jira)
John Roesler created KAFKA-9103:
---

 Summary: KIP-441: Add TaskLags to SubscriptionInfo
 Key: KAFKA-9103
 URL: https://issues.apache.org/jira/browse/KAFKA-9103
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


As described in KIP-441, we will add the TaskLags field to, and remove the 
ActiveTasks and StandbyTasks fields from, the SubscriptionInfo object.

This change can be made independent of the new balancing algorithm, since we 
can transparently convert back and forth between active/standby tasks and the 
tasklags formats, using a lag of 0 to denote active tasks and a lag > 0 to 
denote standbys.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8729) Augment ProduceResponse error messaging for specific culprit records

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959376#comment-16959376
 ] 

ASF GitHub Bot commented on KAFKA-8729:
---

guozhangwang commented on pull request #7522: KAFKA-8729: Add upgrade docs for 
KIP-467 on augmented produce response
URL: https://github.com/apache/kafka/pull/7522
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Augment ProduceResponse error messaging for specific culprit records
> 
>
> Key: KAFKA-8729
> URL: https://issues.apache.org/jira/browse/KAFKA-8729
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Guozhang Wang
>Assignee: Tu Tran
>Priority: Major
>
> 1. We should replace the misleading CORRUPT_RECORD error code with a new 
> INVALID_RECORD.
> 2. We should augment the ProduceResponse with customizable error message and 
> indicators of culprit records.
> 3. We should change the client-side handling logic of non-retriable 
> INVALID_RECORD to re-batch the records.
> Details see: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9089) Reassignment should be resilient to unexpected errors

2019-10-24 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9089.

Resolution: Fixed

> Reassignment should be resilient to unexpected errors
> -
>
> Key: KAFKA-9089
> URL: https://issues.apache.org/jira/browse/KAFKA-9089
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.0
>
>
> Reassignment changes typically involve both an update to the assignment state 
> in zookeeper and an update to the in-memory representation of that state (in 
> the ControllerContext). We can run into trouble when these states get 
> inconsistent with each other, so the reassignment logic attempts to follow 
> some rules to reduce the impact from this:
>  * When creating a new reassignment, we update the state in zookeeper first 
> before updating memory. Until the reassignment is known to be persisted, we 
> do not begin executing any reassignment logic.
>  * When completing a reassignment, all of the completion steps are executed 
> before the state is updated in zookeeper. In the event of a failure, the new 
> controller can retry reassignment completion.
> However, the current logic does not follow these rules strictly which can 
> lead to state inconsistencies in the case of an unexpected error.
>  # When we override or cancel an existing assignment, we currently use an 
> intermediate assignment state which is only reflected in memory. It is 
> basically a mix of the previous assignment state and the overlapping parts of 
> the new reassignment. The purpose of this is to shutdown unneeded replicas 
> from the existing reassignment. Since the intermediate state is not 
> persisted, a controller failure will revert to the old reassignment. Any 
> exception which does not cause a controller failure will result in state 
> divergence.
>  # The target replicas of a reassignment are represented both in the existing 
> assignment (PartitionReplicaAssignment) and in a separate context object 
> (ReassignedPartitionContext). The reassignment context is updated before a 
> reassignment has been accepted and persisted. The intent is to remove this 
> context object in the event of a submission failure, but an unexpected error 
> will leave it around.
> We can make reassignment more resilient to unexpected errors by using 
> consistent update invariants. Specifically we can remove the intermediate 
> assignment state and enforce the invariant that any active reassignment must 
> be persisted before being reflected in memory. Additionally, we can make the 
> assignment state the source of truth for the target replicas and eliminate 
> the possibility of inconsistency. Doing so simplifies the reassignment logic 
> and makes it more resilient.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9089) Reassignment should be resilient to unexpected errors

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959373#comment-16959373
 ] 

ASF GitHub Bot commented on KAFKA-9089:
---

hachikuji commented on pull request #7562: KAFKA-9089; Reassignment should be 
resilient to unexpected errors
URL: https://github.com/apache/kafka/pull/7562
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reassignment should be resilient to unexpected errors
> -
>
> Key: KAFKA-9089
> URL: https://issues.apache.org/jira/browse/KAFKA-9089
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.0
>
>
> Reassignment changes typically involve both an update to the assignment state 
> in zookeeper and an update to the in-memory representation of that state (in 
> the ControllerContext). We can run into trouble when these states get 
> inconsistent with each other, so the reassignment logic attempts to follow 
> some rules to reduce the impact from this:
>  * When creating a new reassignment, we update the state in zookeeper first 
> before updating memory. Until the reassignment is known to be persisted, we 
> do not begin executing any reassignment logic.
>  * When completing a reassignment, all of the completion steps are executed 
> before the state is updated in zookeeper. In the event of a failure, the new 
> controller can retry reassignment completion.
> However, the current logic does not follow these rules strictly which can 
> lead to state inconsistencies in the case of an unexpected error.
>  # When we override or cancel an existing assignment, we currently use an 
> intermediate assignment state which is only reflected in memory. It is 
> basically a mix of the previous assignment state and the overlapping parts of 
> the new reassignment. The purpose of this is to shutdown unneeded replicas 
> from the existing reassignment. Since the intermediate state is not 
> persisted, a controller failure will revert to the old reassignment. Any 
> exception which does not cause a controller failure will result in state 
> divergence.
>  # The target replicas of a reassignment are represented both in the existing 
> assignment (PartitionReplicaAssignment) and in a separate context object 
> (ReassignedPartitionContext). The reassignment context is updated before a 
> reassignment has been accepted and persisted. The intent is to remove this 
> context object in the event of a submission failure, but an unexpected error 
> will leave it around.
> We can make reassignment more resilient to unexpected errors by using 
> consistent update invariants. Specifically we can remove the intermediate 
> assignment state and enforce the invariant that any active reassignment must 
> be persisted before being reflected in memory. Additionally, we can make the 
> assignment state the source of truth for the target replicas and eliminate 
> the possibility of inconsistency. Doing so simplifies the reassignment logic 
> and makes it more resilient.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9102) Increase default zk session timeout and max lag

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959346#comment-16959346
 ] 

ASF GitHub Bot commented on KAFKA-9102:
---

hachikuji commented on pull request #7596: KAFKA-9102; Increase default zk 
session timeout and replica max lag [KIP-537]
URL: https://github.com/apache/kafka/pull/7596
 
 
   This patch increases the default value of `zookeeper.session.timeout` from 
6s to 18s and `replica.lag.time.max.ms` from 10s to 30s. This change was 
documented in KIP-537: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Increase default zk session timeout and max lag
> ---
>
> Key: KAFKA-9102
> URL: https://issues.apache.org/jira/browse/KAFKA-9102
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.5.0
>
>
> This tracks the implementation of KIP-537: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9102) Increase default zk session timeout and max lag

2019-10-24 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9102:
--

 Summary: Increase default zk session timeout and max lag
 Key: KAFKA-9102
 URL: https://issues.apache.org/jira/browse/KAFKA-9102
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 2.5.0


This tracks the implementation of KIP-537: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9101) Create a fetch.max.bytes configuration for the broker

2019-10-24 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-9101:
---

Assignee: Colin McCabe

> Create a fetch.max.bytes configuration for the broker
> -
>
> Key: KAFKA-9101
> URL: https://issues.apache.org/jira/browse/KAFKA-9101
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9101) Create a fetch.max.bytes configuration for the broker

2019-10-24 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-9101:
---

 Summary: Create a fetch.max.bytes configuration for the broker
 Key: KAFKA-9101
 URL: https://issues.apache.org/jira/browse/KAFKA-9101
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9100) The controller code allows for replicas to get added to the assignment when the replicas goes online

2019-10-24 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-9100:
-

 Summary: The controller code allows for replicas to get added to 
the assignment when the replicas goes online
 Key: KAFKA-9100
 URL: https://issues.apache.org/jira/browse/KAFKA-9100
 Project: Kafka
  Issue Type: Task
  Components: controller
Affects Versions: 2.3.0
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


The controller code when modelling the replicas state machine allows for a 
replica to be added to assignment when going online from a new state. This 
should not be possible.

Replicas are added to a partition when creating a topic, creating a partition 
or reassigning a partition. Whether a replica goes online should not cause a 
replica to get added to the assignment.

The code that currently allows this is here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L194-L198

We should restructure the code so that that condition is not possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-10-24 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959308#comment-16959308
 ] 

Richard Yu commented on KAFKA-8522:
---

Found where the tombstones are inserted. That really just answered my question 
above. My bad. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2019-10-24 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959306#comment-16959306
 ] 

Sophie Blee-Goldman commented on KAFKA-4969:


Bill's proposal has already been merged, so you should see a better 
distribution of tasks of the same subtopology/topicGroupId as long as you've 
upgraded to a version containing this fix (not sure what version this first 
went in to, any idea [~bbejeck]?)

This ticket was reopened to cover "true" state-aware assignment that 
distinguishes between heavier stateful subtopologies and lighter stateless 
ones, which should be covered by the work being planned as part of KIP-441 – if 
you're interested you can read up on it here 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams]

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect to restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.
>  
> There have been some additional discussions around task assignment on a 
> related PR https://github.com/apache/kafka/pull/5390



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9099) Reassignments should be retried after unexpected errors

2019-10-24 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9099:
--

 Summary: Reassignments should be retried after unexpected errors
 Key: KAFKA-9099
 URL: https://issues.apache.org/jira/browse/KAFKA-9099
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Stanislav Kozlovski


Currently we do not catch unexpected exceptions in `onPartitionReassignment` in 
the controller. Generally this can lead to state inconsistencies and a failure 
to complete a reassignment. We should add exception handling and come up with a 
retry approach. For example, we could track failed reassignments in the 
controller context and retry them periodically.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6453) Reconsider timestamp propagation semantics

2019-10-24 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959246#comment-16959246
 ] 

Boyang Chen commented on KAFKA-6453:


Executor needs to first check in change into the AK repo, and then merge it 
into the kafka-site

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Victoria Bialas
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6453) Reconsider timestamp propagation semantics

2019-10-24 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959244#comment-16959244
 ] 

Boyang Chen commented on KAFKA-6453:


Need to have both state store section and DSL section to reflect changes, such 
as the reasoning of adding timestamp to state store for better:
 # join semantic, previously we don't have timestamp in the state store, so 
out-of-order join could not be caught. Now we could take max
 # aggregation semantic, the out of order agg is also possible to push the 
result backward. With timestamp, it's more defensive.

Window store and timestamped kv store are also recommended to be put in the 
state store section.

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Victoria Bialas
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9098) Name Repartition Filter, Source, and Sink Processors

2019-10-24 Thread Bill Bejeck (Jira)
Bill Bejeck created KAFKA-9098:
--

 Summary: Name Repartition Filter, Source, and Sink Processors
 Key: KAFKA-9098
 URL: https://issues.apache.org/jira/browse/KAFKA-9098
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0, 2.2.0
Reporter: Bill Bejeck
Assignee: Bill Bejeck


When users provide a name for repartition topics, we should the same name as 
the base for the filter, source and sink operators.  While this does not break 
a topology, users providing names for all processors in a DSL topology may find 
the generated names for the repartition topics filter, source, and sink 
operators as inconsistent with the naming approach.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9097) URL needs update - use https://kafka.apache.org/quickstart

2019-10-24 Thread Bernard A. Badger (Jira)
Bernard A. Badger created KAFKA-9097:


 Summary: URL needs update - use https://kafka.apache.org/quickstart
 Key: KAFKA-9097
 URL: https://issues.apache.org/jira/browse/KAFKA-9097
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Bernard A. Badger


Use better URL for instructions in  README.md, 
https://kafka.apache.org/quickstart
{noformat}
$ git diff HEAD^
diff --git a/README.md b/README.md
index 308d9df8c..7095c161f 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ Now everything else will work.
### Build a jar and run it ###
./gradlew jar
 
-Follow instructions in https://kafka.apache.org/documentation.html#quickstart
+Follow instructions in https://kafka.apache.org/quickstart
 
 ### Build source jar ###
./gradlew srcJar
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959188#comment-16959188
 ] 

ASF GitHub Bot commented on KAFKA-9088:
---

pierDipi commented on pull request #7594: KAFKA-9088: Consolidate 
InternalMockProcessorContext and MockInternal…
URL: https://github.com/apache/kafka/pull/7594
 
 
   …ProcessorContext
   
   Merge `InternalMockProcessorContext` into `MockInternalProcessorContext`
   and replace all `InternalMockProcessorContext` usages by
   `MockInternalProcessorContext`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9083) Various parsing issues in Values class

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959174#comment-16959174
 ] 

ASF GitHub Bot commented on KAFKA-9083:
---

C0urante commented on pull request #7593: KAFKA-9083: Various 
fixes/improvements for Connect's Values class
URL: https://github.com/apache/kafka/pull/7593
 
 
   [Jira](https://issues.apache.org/jira/browse/KAFKA-9083)
   
   The following functional changes are implemented here:
   • Top-level strings beginning with `"true"`, `"false"` and then followed by 
token delimiters (e.g., `"true}"` and `"false]"`) are parsed as strings, not as 
booleans
   • The empty array (`"[]"`) is now parsed as an array with a null value schema
   • The empty map (`"{}"`) is now parsed as a map with null key and value 
schemas
   • Arrays with all-null elements are now parsed successfully (whereas before 
an NPE was thrown) as an array with a null value schema
   • Maps with all-null values are now parsed as maps with null value schemas, 
but non-null key schemas
   • Strings that appear to be arrays at first but are missing comma delimiters 
(e.g., `"[0 1 2]"`) are now parsed as strings instead of arrays
   • A small improvement is made to the debug message generated when map 
parsing fails due to unexpected comma delimiters ("Unable to parse a map entry 
has no key or value" is changed to "Unable to parse a map entry with no key or 
value")
   • A small improvement is made to the debug message generated when map 
parsing fails due to a missing `}` ("Map is missing terminating ']'" is changed 
to "Map is missing terminating '}'")
   • A small improvement is made to the debug message generated when array or 
map parsing fails and parsing is reset to process the input as a string 
("Unable to parse the value as a map" is changed to "Unable to parse the value 
as a map or an array")
   • Embedded values that lack surrounding quotes (e.g., `foo` in `"[foo]"`) 
are no longer treated as strings; this is in line with the JSON-like 
representation that is meant to be supported by the `Values` class and prevents 
errors such as parsing `"[0 1 2]"` as an array containing a single string 
element with a value of `"0 1 2"`
   • The top-level string `"null"` is now parsed as `null` instead of the 
string `"null"`; this does not break attempts to convert the top-level string 
`"null"` into a string (which should also be the string `"null"`)
   
   Every change (except for log message alterations) is verified with one or 
more unit tests, and several unit tests are also added to prevent regression in 
functionality that, while not currently broken, is subtle enough that it may be 
missed in future changes without tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Various parsing issues in Values class
> --
>
> Key: KAFKA-9083
> URL: https://issues.apache.org/jira/browse/KAFKA-9083
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> There are a number of small issues with the Connect framework's {{Values}} 
> class that lead to either unexpected exceptions, unintuitive (and arguably 
> incorrect) behavior, or confusing log messages. These include:
>  * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
> (which should be parsed as an array containing a single null element)
>  * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
> should be parsed as an empty array)
>  * The returned schema is null when parsing the string {{{}}} (instead, it 
> should be a map schema, possibly with null key and value schemas)
>  * Strings that begin with what appear to be booleans (i.e., the literals 
> {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
> {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be 
> parsed as strings; for example, the string {{true}}} is parsed as the boolean 
> {{true}} but should probably just be parsed as the string {{true}}}
>  * Arrays not containing commas are still parsed as arrays in some cases; for 
> example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
> should arguably be parsed as the string literal {{[0 1 2]}}
>  * An exception is logged when attempting to parse input as a map when it 
> doesn't contain 

[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2019-10-24 Thread Dallas Wrege (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959169#comment-16959169
 ] 

Dallas Wrege commented on KAFKA-4969:
-

Bump. I would love to see this issue back in consideration.

We had exactly the same experience as [~mjsax], and a simple search of this 
Jira brought me straight here.

Because of re-keying, our topology is broken up into one "heavy" task and three 
"stateless" tasks. I ended up in situations where some clients are assigned 
only heavy tasks and others only stateless tasks - all were unbalanced at the 
least. We worked around the problem by splitting the stateless parts of the 
topology into separate Stream Processors, but using a workaround is unfortunate.

The straightforward implementation that [~bbejeck] proposes above would have 
worked fine for us - ensuring an even distribution based on topicGroupId.

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect to restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.
>  
> There have been some additional discussions around task assignment on a 
> related PR https://github.com/apache/kafka/pull/5390



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7181) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters IllegalStateException

2019-10-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959150#comment-16959150
 ] 

Matthias J. Sax commented on KAFKA-7181:


As you can see on the ticket, it's not assigned yet. And it's unclear what the 
root cause is. We need to further investigate, especially, as it seems to be 
hard to reproduce. If you have any new finding, please, comment on KAFKA-9073.

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters IllegalStateException
> ---
>
> Key: KAFKA-7181
> URL: https://issues.apache.org/jira/browse/KAFKA-7181
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Romil Kumar Vasani
>Priority: Major
> Fix For: 2.2.0
>
>
> One the StreamThread encounters an IllegalStateException and is marked DEAD, 
> shut down.
> The application doesn't spawn a new thread in it's place, the partitions of 
> that thread are assigned to a different thread and it synchronizes. But the 
> application is stuck in REBALANCING state, as not all StreamThreads are in 
> RUNNING state.
> Excepted: New thread should come up and after synchronization/rebalancing it 
> the KafkaStream.State should be RUNNING
> Since all the active threads (that are not marked DEAD) are in RUNNING state, 
> the KafkaStreams.State should be RUNNING
> P.S. I am reporting an issue for the first time. If there is more information 
> needed I can provide.
> Below are the logs from the IllegalStateException: 
> 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error 
> during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> consumerGroup-stateStore-changelog-10
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
> 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] State transition from RUNNING to 
> PENDING_SHUTDOWN
>  2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] Shutting down
>  2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] 
> o.a.k.clients.producer.KafkaProducer : [Producer 
> clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer 
> with timeoutMillis = 9223372036854775807 ms.
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> 

[jira] [Updated] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id

2019-10-24 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-9093:

Fix Version/s: (was: 2.3.1)
   2.3.2

> NullPointerException in KafkaConsumer with group.instance.id
> 
>
> Key: KAFKA-9093
> URL: https://issues.apache.org/jira/browse/KAFKA-9093
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Rolef Heinrich
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.3.2
>
>
> When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor 
> throws a NullpointerException in close():
>  
> {code:java}
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
> {code}
> {{It turns out that the exception is thrown because the *log* member is not 
> yet initialized (still null) in the constructor when the original exception 
> is handled. The original exception is thrown before *log* is initailized.}}
> {{The side effect of this error is, that close does does not cleanup 
> resources as clean is supposed to do.}}
> *{{The used consumer properties for reference:}}*
>  
> {code:java}
> key.deserializer=com.ibm.streamsx.kafka.serialization
> request.timeout.ms=25000
> value.deserializer=com.ibm.streamsx.kafka.serialization
> client.dns.lookup=use_all_dns_ips
> metadata.max.age.ms=2000
> enable.auto.commit=false
> group.instance.id=myUniqId[0]
> max.poll.interval.ms=30
> group.id=consumer-0
> metric.reporters=com.ibm.streamsx.kafka.clients.consum...
> reconnect.backoff.max.ms=1
> bootstrap.servers=localhost:9092
> max.poll.records=50
> session.timeout.ms=2
> client.id=C-J37-ReceivedMessages[0]
> allow.auto.create.topics=false
> metrics.sample.window.ms=1
> retry.backoff.ms=500
> reconnect.backoff.ms=250{code}
> *Expected behaviour:* throw exception indicating that something is wrong with 
> the chosen group.instance.id.
> The documentation does not tell anything about valid values for 
> group.instance.id.
> *Reproduce:*
>  
>  
> {code:java}
>  
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1");
> props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]");
> props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> KafkaConsumer c = new KafkaConsumer (props);
> }
> }
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
>   at Main.main(Main.java:15)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5722) Refactor ConfigCommand to use the AdminClient

2019-10-24 Thread Gokul Ramanan Subramanian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958988#comment-16958988
 ] 

Gokul Ramanan Subramanian commented on KAFKA-5722:
--

[~viktorsomogyi] are you actively working on this? If so, could you share your 
progress please? Thanks.

> Refactor ConfigCommand to use the AdminClient
> -
>
> Key: KAFKA-5722
> URL: https://issues.apache.org/jira/browse/KAFKA-5722
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>  Labels: kip, needs-kip
> Fix For: 2.5.0
>
>
> The ConfigCommand currently uses a direct connection to zookeeper. The 
> zookeeper dependency should be deprecated and an AdminClient API created to 
> be used instead.
> This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5561) Java based TopicCommand to use the Admin client

2019-10-24 Thread Gokul Ramanan Subramanian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958985#comment-16958985
 ] 

Gokul Ramanan Subramanian commented on KAFKA-5561:
--

[~ppatierno] are you actively working on this? If so, could you share your 
progress please? Thanks.

> Java based TopicCommand to use the Admin client
> ---
>
> Key: KAFKA-5561
> URL: https://issues.apache.org/jira/browse/KAFKA-5561
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Hi, 
> as suggested in the https://issues.apache.org/jira/browse/KAFKA-3331, it 
> could be great to have the TopicCommand using the new Admin client instead of 
> the way it works today.
> As pushed by [~gwenshap] in the above JIRA, I'm going to work on it.
> Thanks,
> Paolo



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9096) Validate config inter-dependencies for KafkaConfig only after dynamic configs are loaded

2019-10-24 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-9096:
-

 Summary: Validate config inter-dependencies for KafkaConfig only 
after dynamic configs are loaded
 Key: KAFKA-9096
 URL: https://issues.apache.org/jira/browse/KAFKA-9096
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.5.0


We currently perform all validations in KafkaConfig when creating the config 
instance. This includes some checks that include multiple configs, e.g. is the 
inter-broker listener also present in the listener map? With dynamic configs, 
some configs come from the static properties file and others come from 
ZooKeeper. At the moment, we have a requirement for the static properties file 
to be consistent on its own, even though the config we actually use for the 
broker is the one where we have overlayed the configs from ZK on top of the 
static configs. We should continue to validate individual configs for the 
static configs file, but delay validating inter-dependencies until dynamic 
configs have also been processed.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9092) Kafka Streams Upgrade Magic v0 does not support record headers

2019-10-24 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958952#comment-16958952
 ] 

Guozhang Wang commented on KAFKA-9092:
--

[~wxmimperio] I think you are hitting KAFKA-6739 indeed. As explained in 
https://github.com/apache/kafka/pull/4813/files, there are some produce request 
with newer (v2 or v3) versions that contains headers, and when down-converting 
to v1 the headers should be ignored. I suggest you upgrade your brokers to 
1.0.2: https://archive.apache.org/dist/kafka/1.0.2/RELEASE_NOTES.html

> Kafka Streams Upgrade Magic v0 does not support record headers
> --
>
> Key: KAFKA-9092
> URL: https://issues.apache.org/jira/browse/KAFKA-9092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 2.0.0
> Environment: kafka 1.0.0
> kafka streams lib 2.0.0
>Reporter: wxmimperio
>Priority: Major
>
> My kafka cluster version: 1.0.0 and run a streams app to topic v1.
>  
> Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis.
> Reference Upgrade doc: 
> [http://kafka.apache.org/20/documentation/streams/upgrade-guide]
>  
> {code:java}
> 
>  org.apache.kafka
>  kafka-streams
>  2.0.0
> {code}
>  
> {code:java}
> // streams config
> settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");{code}
>  
> Then stop streams app and rebuild a new jar to start.
> It’s no problem just starting to run. After a few hours kafka broker logs 
> error and I delete topic recreate a few hours kafka broker get same error:
> {code:java}
> [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error 
> when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does 
> not support record headers at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>  at scala.Option.map(Option.scala:146) at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>  at scala.Option.flatMap(Option.scala:171) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>  at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>  at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>  at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>  at 
> 

[jira] [Commented] (KAFKA-7181) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters IllegalStateException

2019-10-24 Thread amuthan Ganeshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958940#comment-16958940
 ] 

amuthan Ganeshan commented on KAFKA-7181:
-

Thank you [~mjsax] for responding, is KAFKA-9073 assigned to somebody already? 
Still, I couldn't able to come up with the root cause and also I not able to 
reproduce it at will.

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters IllegalStateException
> ---
>
> Key: KAFKA-7181
> URL: https://issues.apache.org/jira/browse/KAFKA-7181
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Romil Kumar Vasani
>Priority: Major
> Fix For: 2.2.0
>
>
> One the StreamThread encounters an IllegalStateException and is marked DEAD, 
> shut down.
> The application doesn't spawn a new thread in it's place, the partitions of 
> that thread are assigned to a different thread and it synchronizes. But the 
> application is stuck in REBALANCING state, as not all StreamThreads are in 
> RUNNING state.
> Excepted: New thread should come up and after synchronization/rebalancing it 
> the KafkaStream.State should be RUNNING
> Since all the active threads (that are not marked DEAD) are in RUNNING state, 
> the KafkaStreams.State should be RUNNING
> P.S. I am reporting an issue for the first time. If there is more information 
> needed I can provide.
> Below are the logs from the IllegalStateException: 
> 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error 
> during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> consumerGroup-stateStore-changelog-10
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
> 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] State transition from RUNNING to 
> PENDING_SHUTDOWN
>  2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] Shutting down
>  2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] 
> o.a.k.clients.producer.KafkaProducer : [Producer 
> clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer 
> with timeoutMillis = 9223372036854775807 ms.
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [consumerGroup-StreamThread-2] Shutdown complete
>  

[jira] [Created] (KAFKA-9095) Kafka Cluster Issue when API Version =1

2019-10-24 Thread Mantesh Janagond (Jira)
Mantesh Janagond created KAFKA-9095:
---

 Summary: Kafka Cluster Issue when API Version =1
 Key: KAFKA-9095
 URL: https://issues.apache.org/jira/browse/KAFKA-9095
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.0.0
Reporter: Mantesh Janagond


When I am using Kafka cluster with 3 nodes and Kafka API version = 1

I am getting the following error code = 16
{{The broker returns this error code if it receives an offset fetch or commit 
request for a consumer group that it is not a coordinator for.}}

But the same setup is working for Kafka API Version = 0.

So, please suggests to me how to solve this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9094) Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958784#comment-16958784
 ] 

ASF GitHub Bot commented on KAFKA-9094:
---

stanislavkozlovski commented on pull request #7591: KAFKA-9094: Add server-side 
replica validation for ZK triggered reassignments
URL: https://github.com/apache/kafka/pull/7591
 
 
   This patch adds the same server-side validation we use for API triggered 
reassignments to the ZK path.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Validate the replicas for partition reassignments triggered through the 
> /admin/reassign_partitions zNode
> 
>
> Key: KAFKA-9094
> URL: https://issues.apache.org/jira/browse/KAFKA-9094
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> As was mentioned by [~jsancio] in 
> [https://github.com/apache/kafka/pull/7574#discussion_r337621762] , it would 
> make sense to apply the same replica validation we apply to the KIP-455 
> reassignments API.
> Namely, validate that the replicas:
> * are not empty (e.g [])
> * are not negative negative (e.g [1,2,-1])
> * are not brokers that are not part of the cluster or otherwise unhealthy 
> (e.g not in /brokers zNode)
> The last liveness validation is subject to comments. We are re-evaluating 
> whether we want to enforce it for the API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9094) Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode

2019-10-24 Thread Stanislav Kozlovski (Jira)
Stanislav Kozlovski created KAFKA-9094:
--

 Summary: Validate the replicas for partition reassignments 
triggered through the /admin/reassign_partitions zNode
 Key: KAFKA-9094
 URL: https://issues.apache.org/jira/browse/KAFKA-9094
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


As was mentioned by [~jsancio] in 
[https://github.com/apache/kafka/pull/7574#discussion_r337621762] , it would 
make sense to apply the same replica validation we apply to the KIP-455 
reassignments API.

Namely, validate that the replicas:
* are not empty (e.g [])
* are not negative negative (e.g [1,2,-1])
* are not brokers that are not part of the cluster or otherwise unhealthy (e.g 
not in /brokers zNode)

The last liveness validation is subject to comments. We are re-evaluating 
whether we want to enforce it for the API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id

2019-10-24 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-9093:
-

Assignee: huxihx

> NullPointerException in KafkaConsumer with group.instance.id
> 
>
> Key: KAFKA-9093
> URL: https://issues.apache.org/jira/browse/KAFKA-9093
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Rolef Heinrich
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.3.1
>
>
> When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor 
> throws a NullpointerException in close():
>  
> {code:java}
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
> {code}
> {{It turns out that the exception is thrown because the *log* member is not 
> yet initialized (still null) in the constructor when the original exception 
> is handled. The original exception is thrown before *log* is initailized.}}
> {{The side effect of this error is, that close does does not cleanup 
> resources as clean is supposed to do.}}
> *{{The used consumer properties for reference:}}*
>  
> {code:java}
> key.deserializer=com.ibm.streamsx.kafka.serialization
> request.timeout.ms=25000
> value.deserializer=com.ibm.streamsx.kafka.serialization
> client.dns.lookup=use_all_dns_ips
> metadata.max.age.ms=2000
> enable.auto.commit=false
> group.instance.id=myUniqId[0]
> max.poll.interval.ms=30
> group.id=consumer-0
> metric.reporters=com.ibm.streamsx.kafka.clients.consum...
> reconnect.backoff.max.ms=1
> bootstrap.servers=localhost:9092
> max.poll.records=50
> session.timeout.ms=2
> client.id=C-J37-ReceivedMessages[0]
> allow.auto.create.topics=false
> metrics.sample.window.ms=1
> retry.backoff.ms=500
> reconnect.backoff.ms=250{code}
> *Expected behaviour:* throw exception indicating that something is wrong with 
> the chosen group.instance.id.
> The documentation does not tell anything about valid values for 
> group.instance.id.
> *Reproduce:*
>  
>  
> {code:java}
>  
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1");
> props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]");
> props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> KafkaConsumer c = new KafkaConsumer (props);
> }
> }
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
>   at Main.main(Main.java:15)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id

2019-10-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958721#comment-16958721
 ] 

ASF GitHub Bot commented on KAFKA-9093:
---

huxihx commented on pull request #7590: KAFKA-9093: NullPointerException in 
KafkaConsumer with group.instance.id
URL: https://github.com/apache/kafka/pull/7590
 
 
   https://issues.apache.org/jira/browse/KAFKA-9093
   
   `log` in KafkaConsumer does not get initialized if an invalid value for 
`group.intance.id` is given during consumer construction.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException in KafkaConsumer with group.instance.id
> 
>
> Key: KAFKA-9093
> URL: https://issues.apache.org/jira/browse/KAFKA-9093
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Rolef Heinrich
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.3.1
>
>
> When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor 
> throws a NullpointerException in close():
>  
> {code:java}
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
> {code}
> {{It turns out that the exception is thrown because the *log* member is not 
> yet initialized (still null) in the constructor when the original exception 
> is handled. The original exception is thrown before *log* is initailized.}}
> {{The side effect of this error is, that close does does not cleanup 
> resources as clean is supposed to do.}}
> *{{The used consumer properties for reference:}}*
>  
> {code:java}
> key.deserializer=com.ibm.streamsx.kafka.serialization
> request.timeout.ms=25000
> value.deserializer=com.ibm.streamsx.kafka.serialization
> client.dns.lookup=use_all_dns_ips
> metadata.max.age.ms=2000
> enable.auto.commit=false
> group.instance.id=myUniqId[0]
> max.poll.interval.ms=30
> group.id=consumer-0
> metric.reporters=com.ibm.streamsx.kafka.clients.consum...
> reconnect.backoff.max.ms=1
> bootstrap.servers=localhost:9092
> max.poll.records=50
> session.timeout.ms=2
> client.id=C-J37-ReceivedMessages[0]
> allow.auto.create.topics=false
> metrics.sample.window.ms=1
> retry.backoff.ms=500
> reconnect.backoff.ms=250{code}
> *Expected behaviour:* throw exception indicating that something is wrong with 
> the chosen group.instance.id.
> The documentation does not tell anything about valid values for 
> group.instance.id.
> *Reproduce:*
>  
>  
> {code:java}
>  
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1");
> props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]");
> props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> KafkaConsumer c = new KafkaConsumer (props);
> }
> }
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>   at 
> 

[jira] [Created] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id

2019-10-24 Thread Rolef Heinrich (Jira)
Rolef Heinrich created KAFKA-9093:
-

 Summary: NullPointerException in KafkaConsumer with 
group.instance.id
 Key: KAFKA-9093
 URL: https://issues.apache.org/jira/browse/KAFKA-9093
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.3.0
Reporter: Rolef Heinrich
 Fix For: 2.3.1


When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor 
throws a NullpointerException in close():

 
{code:java}
Caused by: java.lang.NullPointerException
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)

{code}
{{It turns out that the exception is thrown because the *log* member is not yet 
initialized (still null) in the constructor when the original exception is 
handled. The original exception is thrown before *log* is initailized.}}

{{The side effect of this error is, that close does does not cleanup resources 
as clean is supposed to do.}}

*{{The used consumer properties for reference:}}*

 
{code:java}
key.deserializer=com.ibm.streamsx.kafka.serialization
request.timeout.ms=25000
value.deserializer=com.ibm.streamsx.kafka.serialization
client.dns.lookup=use_all_dns_ips
metadata.max.age.ms=2000
enable.auto.commit=false
group.instance.id=myUniqId[0]
max.poll.interval.ms=30
group.id=consumer-0
metric.reporters=com.ibm.streamsx.kafka.clients.consum...
reconnect.backoff.max.ms=1
bootstrap.servers=localhost:9092
max.poll.records=50
session.timeout.ms=2
client.id=C-J37-ReceivedMessages[0]
allow.auto.create.topics=false
metrics.sample.window.ms=1
retry.backoff.ms=500
reconnect.backoff.ms=250{code}
*Expected behaviour:* throw exception indicating that something is wrong with 
the chosen group.instance.id.

The documentation does not tell anything about valid values for 
group.instance.id.

*Reproduce:*

 

 
{code:java}
 
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Main {
public static void main(String[] args) {
Properties props = new Properties();
props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1");
props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]");
props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer c = new KafkaConsumer (props);
}
}


Exception in thread "main" java.lang.NullPointerException
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
at Main.main(Main.java:15)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9083) Various parsing issues in Values class

2019-10-24 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9083:
-
Description: 
There are a number of small issues with the Connect framework's {{Values}} 
class that lead to either unexpected exceptions, unintuitive (and arguably 
incorrect) behavior, or confusing log messages. These include:
 * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
(which should be parsed as an array containing a single null element)
 * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
should be parsed as an empty array)
 * The returned schema is null when parsing the string {{{}}} (instead, it 
should be a map schema, possibly with null key and value schemas)
 * Strings that begin with what appear to be booleans (i.e., the literals 
{{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
{{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be parsed 
as strings; for example, the string {{true}}} is parsed as the boolean {{true}} 
but should probably just be parsed as the string {{true}}}
 * Arrays not containing commas are still parsed as arrays in some cases; for 
example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
should arguably be parsed as the string literal {{[0 1 2]}}
 * An exception is logged when attempting to parse input as a map when it 
doesn't contain the a final closing brace that states "Map is missing 
terminating ']'" even though the expected terminating character is actually 
{{}} and not {{]}}
 * Finally, and possibly most contentious, escape sequences are not stripped 
from string literals. Thus, the input string 
{{foobar]}} is parsed as the literal string 
{{foobar]}}, which is somewhat unexpected, since that 
means it is impossible to pass in a string that is parsed as the string literal 
{{foobar]}}, and it is the job of the caller to handle stripping of 
such escape sequences. Given that the escape sequence can itself be escaped, it 
seems better to automatically strip it from parsed string literals before 
returning them to the user.

 

  was:
There are a number of small issues with the Connect framework's {{Values}} 
class that lead to either unexpected exceptions, unintuitive (and arguably 
incorrect) behavior, or confusing log messages. These include:
 * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
(which should be parsed as an array containing a single null element)
 * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
should be parsed as an empty array)
 * The returned schema is null when parsing the string {{{}}} (instead, it 
should be a map schema, possibly with null key and value schemas)
 * Strings that begin with what appear to be booleans (i.e., the literals 
{{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
{{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be parsed 
as strings; for example, the string {{true}}} is parsed as the boolean {{true}} 
but should probably just be parsed as the string {{true}}}
 * Arrays not containing commas are still parsed as arrays in some cases; for 
example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
should arguably be parsed as the string literal {{[0 1 2]}}
 * An exception is logged when attempting to parse input as a map when it 
doesn't contain the a final closing brace that states "Map is missing 
terminating ']'" even though the expecting terminating character is actually 
{{}} and not {{]}}
 * Finally, and possibly most contentious, escape sequences are not stripped 
from string literals. Thus, the input string 
{{foobar]}} is parsed as the literal string 
{{foobar]}}, which is somewhat unexpected, since that 
means it is impossible to pass in a string that is parsed as the string literal 
{{foobar]}}, and it is the job of the caller to handle stripping of 
such escape sequences. Given that the escape sequence can itself be escaped, it 
seems better to automatically strip it from parsed string literals before 
returning them to the user.

 


> Various parsing issues in Values class
> --
>
> Key: KAFKA-9083
> URL: https://issues.apache.org/jira/browse/KAFKA-9083
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> There are a number of small issues with the Connect framework's {{Values}} 
> class that lead to either unexpected exceptions, unintuitive (and arguably 
> incorrect) behavior, or confusing log messages. These include:
>  * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
> (which should be parsed as an array containing a single null element)
>  * A 

[jira] [Commented] (KAFKA-9092) Kafka Streams Upgrade Magic v0 does not support record headers

2019-10-24 Thread wxmimperio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958599#comment-16958599
 ] 

wxmimperio commented on KAFKA-9092:
---

[~mjsax]

Hi, I think my cluster message format is support headers and Is newer than 0.11.

 
{code:java}
log.message.timestamp.type=LogAppendTime
inter.broker.protocol.version=1.0
log.message.format.version=1.0
{code}
 

> Kafka Streams Upgrade Magic v0 does not support record headers
> --
>
> Key: KAFKA-9092
> URL: https://issues.apache.org/jira/browse/KAFKA-9092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 2.0.0
> Environment: kafka 1.0.0
> kafka streams lib 2.0.0
>Reporter: wxmimperio
>Priority: Major
>
> My kafka cluster version: 1.0.0 and run a streams app to topic v1.
>  
> Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis.
> Reference Upgrade doc: 
> [http://kafka.apache.org/20/documentation/streams/upgrade-guide]
>  
> {code:java}
> 
>  org.apache.kafka
>  kafka-streams
>  2.0.0
> {code}
>  
> {code:java}
> // streams config
> settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");{code}
>  
> Then stop streams app and rebuild a new jar to start.
> It’s no problem just starting to run. After a few hours kafka broker logs 
> error and I delete topic recreate a few hours kafka broker get same error:
> {code:java}
> [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error 
> when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does 
> not support record headers at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>  at scala.Option.map(Option.scala:146) at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>  at scala.Option.flatMap(Option.scala:171) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>  at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>  at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>  at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>  at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
>  at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  at 
> 

[jira] [Comment Edited] (KAFKA-9092) Kafka Streams Upgrade Magic v0 does not support record headers

2019-10-24 Thread wxmimperio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958599#comment-16958599
 ] 

wxmimperio edited comment on KAFKA-9092 at 10/24/19 7:01 AM:
-

[~mjsax]

Hi, I think my cluster message format is support headers and Is newer than 0.11.
{code:java}
// server.properties
log.message.times.tamp.type=LogAppendTime
inter.broker.protocol.version=1.0
log.message.format.version=1.0
{code}
 


was (Author: wxmimperio):
[~mjsax]

Hi, I think my cluster message format is support headers and Is newer than 0.11.

 
{code:java}
log.message.timestamp.type=LogAppendTime
inter.broker.protocol.version=1.0
log.message.format.version=1.0
{code}
 

> Kafka Streams Upgrade Magic v0 does not support record headers
> --
>
> Key: KAFKA-9092
> URL: https://issues.apache.org/jira/browse/KAFKA-9092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 2.0.0
> Environment: kafka 1.0.0
> kafka streams lib 2.0.0
>Reporter: wxmimperio
>Priority: Major
>
> My kafka cluster version: 1.0.0 and run a streams app to topic v1.
>  
> Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis.
> Reference Upgrade doc: 
> [http://kafka.apache.org/20/documentation/streams/upgrade-guide]
>  
> {code:java}
> 
>  org.apache.kafka
>  kafka-streams
>  2.0.0
> {code}
>  
> {code:java}
> // streams config
> settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");{code}
>  
> Then stop streams app and rebuild a new jar to start.
> It’s no problem just starting to run. After a few hours kafka broker logs 
> error and I delete topic recreate a few hours kafka broker get same error:
> {code:java}
> [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error 
> when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does 
> not support record headers at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>  at scala.Option.map(Option.scala:146) at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>  at scala.Option.flatMap(Option.scala:171) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>  at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>  at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>  at 
>