[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-07-15 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885855#comment-16885855
 ] 

Omkar Mestry commented on KAFKA-7245:
-

So should I start making changes in code for removing the usage of method. 

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> KIP-474 :- 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-7711) Add a bounded flush() API to Kafka Producer

2019-07-15 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885795#comment-16885795
 ] 

Richard Yu edited comment on KAFKA-7711 at 7/16/19 3:01 AM:


[~hachikuji] I think I've seen this issue come up multiple times in the past. 
Not sure exactly which issue numbers. Do you think this is a issue that really 
needs to be tackled?

Edit: Got it somewhat mixed up. There had been issues with Kafka's send API in 
the past. For example, KAFKA-6705. But not the flush() API. It might be good to 
have something like it though.

 


was (Author: yohan123):
[~hachikuji] I think I've seen this issue come up multiple times in the past. 
Not sure exactly which issue numbers. Do you think this is a issue that really 
needs to be tackled?

> Add a bounded flush()  API to Kafka Producer
> 
>
> Key: KAFKA-7711
> URL: https://issues.apache.org/jira/browse/KAFKA-7711
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: kun du
>Priority: Minor
>  Labels: needs-kip
>
> Currently the call to Producer.flush() can be hang there for indeterminate 
> time.
> It is a good idea to add a bounded flush() API and timeout if producer is 
> unable to flush all the batch records in a limited time. In this way the 
> caller of flush() has a chance to decide what to do next instead of just wait 
> forever.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7711) Add a bounded flush() API to Kafka Producer

2019-07-15 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885795#comment-16885795
 ] 

Richard Yu commented on KAFKA-7711:
---

[~hachikuji] I think I've seen this issue come up multiple times in the past. 
Not sure exactly which issue numbers. Do you think this is a issue that really 
needs to be tackled?

> Add a bounded flush()  API to Kafka Producer
> 
>
> Key: KAFKA-7711
> URL: https://issues.apache.org/jira/browse/KAFKA-7711
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: kun du
>Priority: Minor
>  Labels: needs-kip
>
> Currently the call to Producer.flush() can be hang there for indeterminate 
> time.
> It is a good idea to add a bounded flush() API and timeout if producer is 
> unable to flush all the batch records in a limited time. In this way the 
> caller of flush() has a chance to decide what to do next instead of just wait 
> forever.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8555) Flaky test ExampleConnectIntegrationTest#testSourceConnector

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885767#comment-16885767
 ] 

Matthias J. Sax commented on KAFKA-8555:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/326/testReport/junit/org.apache.kafka.connect.integration/ExampleConnectIntegrationTest/testSourceConnector/]

> Flaky test ExampleConnectIntegrationTest#testSourceConnector
> 
>
> Key: KAFKA-8555
> URL: https://issues.apache.org/jira/browse/KAFKA-8555
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
> Attachments: log-job139.txt, log-job141.txt, log-job23145.txt, 
> log-job23215.txt, log-job6046.txt
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/22798/console]
> *02:03:21* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.test.stdout*02:03:21*
>  *02:03:21* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > 
> testSourceConnector FAILED*02:03:21* 
> org.apache.kafka.connect.errors.DataException: Insufficient records committed 
> by connector simple-conn in 15000 millis. Records expected=2000, 
> actual=1013*02:03:21* at 
> org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)*02:03:21*
>  at 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)*02:03:21*



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8672) RebalanceSourceConnectorsIntegrationTest#testReconfigConnector

2019-07-15 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8672:
--

 Summary: 
RebalanceSourceConnectorsIntegrationTest#testReconfigConnector
 Key: KAFKA-8672
 URL: https://issues.apache.org/jira/browse/KAFKA-8672
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.4.0
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6281/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testReconfigConnector/]
{quote}java.lang.RuntimeException: Could not find enough records. found 33, 
expected 100 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:306)
 at 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testReconfigConnector(RebalanceSourceConnectorsIntegrationTest.java:180){quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885740#comment-16885740
 ] 

Matthias J. Sax commented on KAFKA-8412:


Just cycling back to this.

I am not sure if we should do the "close-and-then-recreate" strategy. The 
issues seems to be, that we blindly call `task#close()` for all tasks within 
`AssingedTasks#close()`. However, it seems that a proper fix would be to 
distinguish between active and suspended tasks, and call `closeSuspended()` for 
suspended ones, and `close()` for all others?

With regard to KAFKA-7678: re-reading the ticket, the report says, that it 
should be a graceful shutdown via SIGTERM. Hence, I am wondering why the 
unclean shutdown path is actually executed (ie, why do we call 
`maybeAbortTransactionAndCloseRecordCollector()` – this should only happen for 
an unclean shutdown). \cc [~pachilo] who reported and work on the first NPE

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Assignee: Matthias J. Sax
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>     this.producer.close();
>     this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way 

[jira] [Commented] (KAFKA-8179) Incremental Rebalance Protocol for Kafka Consumer

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885726#comment-16885726
 ] 

ASF GitHub Bot commented on KAFKA-8179:
---

ableegoldman commented on pull request #7095: KAFKA-8179: Minor, add 
ownedPartitions to PartitionAssignor#subscription
URL: https://github.com/apache/kafka/pull/7095
 
 
   This is just a just a minor PR for KIP-429, which needs 
`ConsumerCoordinator` to pass the current assignment to the assignor when 
building the subscription.
   
   Since this is in the internal package and a cursory search of github 
suggests not too many people have implemented this method, we just change the 
signature rather than add a new method and deprecate the old one. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incremental Rebalance Protocol for Kafka Consumer
> -
>
> Key: KAFKA-8179
> URL: https://issues.apache.org/jira/browse/KAFKA-8179
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Recently Kafka community is promoting cooperative rebalancing to mitigate the 
> pain points in the stop-the-world rebalancing protocol. This ticket is 
> created to initiate that idea at the Kafka consumer client, which will be 
> beneficial for heavy-stateful consumers such as Kafka Streams applications.
> In short, the scope of this ticket includes reducing unnecessary rebalance 
> latency due to heavy partition migration: i.e. partitions being revoked and 
> re-assigned. This would make the built-in consumer assignors (range, 
> round-robin etc) to be aware of previously assigned partitions and be sticky 
> in best-effort.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885709#comment-16885709
 ] 

Matthias J. Sax commented on KAFKA-8671:


This issue might be related to KAFKA-5998 – if we cannot reproduce on `trunk` 
any longer, the fix of KAFKA-5998 might fix this one, too.

> NullPointerException occurs if topic associated with GlobalKTable changes
> -
>
> Key: KAFKA-8671
> URL: https://issues.apache.org/jira/browse/KAFKA-8671
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Alex Leung
>Priority: Critical
>
> The following NullPointerException occurs when the global/.checkpoint file 
> contains a line with a topic previously associated with (but no longer 
> configured for) a GlobalKTable:
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85)
> at 
> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241)
> at 
> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code}
>  
> After line 84 
> ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)]
>  `sourceNodeAndDeserializer` is null for the old, but still valid, topic. 
> This can be reproduced with the following sequence:
>  # create a GlobalKTable associated with topic, 'global-topic1'
>  # change the topic associated with the GlobalKTable to 'global-topic2' 
>  ##  at this point, the global/.checkpoint file will contain lines for both 
> topics
>  # produce messages to previous topic ('global-topic1')
>  # the consumer will attempt to consume from global-topic1, but no 
> deserializer associated with global-topic1 will be found and the NPE will 
> occur
> It looks like the following recent commit has included checkpoint validations 
> that may prevent this issue: 
> https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-15 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8637.

Resolution: Fixed

> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885707#comment-16885707
 ] 

Matthias J. Sax commented on KAFKA-8650:


Splitting out initialization is not easy. The problem is that one might start 
multiple instances at once, but only one instance should do the initialization. 
Furthermore, processing can only start after the initialization is completed. 
Atm, we rely on consumer rebalance protocol to ensure this property: the group 
leader will do the initialization. If we separate both, there are two problems 
if you start multiple instances at the same time: (1) which instance should do 
the initialization. (2) How do other instances know that the initialization is 
finished and that they can start processing?

Maybe, it would be possible to enhance the reset-tool though: it might be 
possible to hand in the `StreamsBuilder` and the reset-tool could setup the 
topics accordingly (may open question raises though; it's just a rough idea).

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes

2019-07-15 Thread Alex Leung (JIRA)
Alex Leung created KAFKA-8671:
-

 Summary: NullPointerException occurs if topic associated with 
GlobalKTable changes
 Key: KAFKA-8671
 URL: https://issues.apache.org/jira/browse/KAFKA-8671
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: Alex Leung


The following NullPointerException occurs when the global/.checkpoint file 
contains a line with a topic previously associated with (but no longer 
configured for) a GlobalKTable:


{code:java}
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85)
at 
org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241)
at 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code}
 

After line 84 
([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)]
 `sourceNodeAndDeserializer` is null for the old, but still valid, topic. This 
can be reproduced with the following sequence:


 # create a GlobalKTable associated with topic, 'global-topic1'
 # change the topic associated with the GlobalKTable to 'global-topic2' 
 ##  at this point, the global/.checkpoint file will contain lines for both 
topics
 # produce messages to previous topic ('global-topic1')
 # the consumer will attempt to consume from global-topic1, but no deserializer 
associated with global-topic1 will be found and the NPE will occur

It looks like the following recent commit has included checkpoint validations 
that may prevent this issue: 
https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885704#comment-16885704
 ] 

ASF GitHub Bot commented on KAFKA-8670:
---

wyuka commented on pull request #7094: KAFKA-8670 and KAFKA-8053: Fix 
kafka-topics.sh --describe without --t…
URL: https://github.com/apache/kafka/pull/7094
 
 
   …opic mentioned if there are no topics in cluster; improve error message.
   
   If there are no topics in a cluster, kafka-topics.sh --describe without
   a --topic option should return empty list, do not throw an exception.
   
   If there are no matching topics, throw IllegalArgumentException but with
   better error message.
   
   ### KAFKA-8670
   We pass a boolean flag to `ensureTopicExists` method indicating whether to 
throw an exception if there are no topics in the cluster. In case of 
`kafka-topics.sh --describe`, the exception **should NOT** be thrown if either 
of these are true -
   1. A `--topic` option was not passed to the CLI. In that case, the output 
should be empty.
   2. A `--if-exists` option was passed to the CLI.
   Earlier, the first condition was not part of the check. This bugfix adds the 
first condition mentioned above to the check.
   
   ### KAFKA-8053
   Earlier, when `./kafka-topics.sh` was run with a `--topic` argument that was 
not found in the list of topics, the exception message was always "Topics in [] 
does not exist". This was a useless message. I have changed it to "No topic 
matching  found".
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   I added the necessary unit test to check for this case.
   Also, I ran these commands. The output before and after my change are 
mentioned inline
   
   **Describe without topic name - Before**
   ```
   ./kafka-topics.sh --zookeeper 
10.0.32.180:2181,10.0.16.211:2181,10.0.0.220:2181 --describe
   Error while executing topic command : Topics in [] does not exist
   [2019-07-15 23:23:25,674] ERROR java.lang.IllegalArgumentException: Topics 
in [] does not exist
at 
kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
at 
kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
   ```
   
   **Describe without topic name - After (no output)**
   ```
   ./kafka-topics.sh --zookeeper 
10.0.32.180:2181,10.0.16.211:2181,10.0.0.220:2181 --describe
   ```
   
   **Describe with topic name - Before**
   ```
   ./kafka-topics.sh --zookeeper 
10.0.32.180:2181,10.0.16.211:2181,10.0.0.220:2181 --describe --topic topic-1
   Error while executing topic command : Topics in [] does not exist
   [2019-07-15 23:23:36,152] ERROR java.lang.IllegalArgumentException: Topics 
in [] does not exist
at 
kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
at 
kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
   ```
   
   **Describe with topic name - After**
   ```
   Error while executing topic command : No topic matching name 'topic-1' found
   [2019-07-15 23:24:55,169] ERROR java.lang.IllegalArgumentException: No topic 
matching 'topic-1' found
at 
kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:445)
at 
kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:358)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:67)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist on the cluster
> ---
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
> 

[jira] [Commented] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885702#comment-16885702
 ] 

ASF GitHub Bot commented on KAFKA-8637:
---

guozhangwang commented on pull request #7050: KAFKA-8637: WriteBatch objects 
leak off-heap memory
URL: https://github.com/apache/kafka/pull/7050
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8620) Race condition in StreamThread state change

2019-07-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8620:
---
Fix Version/s: 2.3.1
   2.2.2
   2.1.2

> Race condition in StreamThread state change
> ---
>
> Key: KAFKA-8620
> URL: https://issues.apache.org/jira/browse/KAFKA-8620
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> In the call to `StreamThread.addRecordsToTasks` we don't have synchronization 
> when we attempt to extract active tasks. If after one long poll in runOnce 
> the application state changes to PENDING_SHUTDOWN, there is a potential close 
> on TaskManager which erases the active tasks map, thus triggering NPE and 
> bringing the thread state to a false shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

2019-07-15 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8602:
-
Fix Version/s: 2.4.0

> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Critical
> Fix For: 2.4.0
>
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This 
> happens when a StreamThread gets assigned standby tasks for sub-topologies 
> with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one 
> standby replica each and the following topology. The input topic should have 
> two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder> keyValueStoreBuilder =
> 
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer Integer>>() {
> private KeyValueStore state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore) 
> context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue transform(final Integer key, 
> final Integer value) {
> final KeyValue result = new KeyValue<>(key, 
> value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of 
> the sub-topology have logging disabled.  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885676#comment-16885676
 ] 

ASF GitHub Bot commented on KAFKA-8602:
---

bbejeck commented on pull request #7092: KAFKA-8602: Separate PR for 2.3 branch
URL: https://github.com/apache/kafka/pull/7092
 
 
   This PR is from the original work by @cadonna in #7008.  Due to incompatible 
changes in trunk that should not get cherry-picked back, a separate PR is 
required for this bug fix
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Critical
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This 
> happens when a StreamThread gets assigned standby tasks for sub-topologies 
> with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one 
> standby replica each and the following topology. The input topic should have 
> two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder> keyValueStoreBuilder =
> 
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer Integer>>() {
> private KeyValueStore state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore) 
> context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue transform(final Integer key, 
> final Integer value) {
> final KeyValue result = new KeyValue<>(key, 
> value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of 
> the sub-topology have logging disabled.  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885667#comment-16885667
 ] 

ASF GitHub Bot commented on KAFKA-8670:
---

wyuka commented on pull request #7091: KAFKA-8670: Fix kafka-topics.sh 
--describe without --topic when cluster has no topics.
URL: https://github.com/apache/kafka/pull/7091
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist on the cluster
> ---
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Tirtha Chatterjee
>Assignee: Tirtha Chatterjee
>Priority: Major
>
> When trying to describe all  the kafka-topics.sh utility, a user would run 
> kafka-topics.sh --describe without passing a --topic option. If there are no 
> topics on the cluster, Kafka returns an error with IllegalArgumentException.
> {code:java}
> ./kafka-topics.sh --zookeeper 
> 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
> Error while executing topic command : Topics in [] does not exist
> [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
> [] does not exist
> at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
> at 
> kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> (kafka.admin.TopicCommand$)
> {code}
>  
>  If no --topic option is passed to the command, and there are no topics on 
> the cluster, the command should not fail, rather have empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8620) Race condition in StreamThread state change

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885655#comment-16885655
 ] 

ASF GitHub Bot commented on KAFKA-8620:
---

mjsax commented on pull request #7021: KAFKA-8620: fix NPE due to race 
condition during shutdown while rebalancing
URL: https://github.com/apache/kafka/pull/7021
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Race condition in StreamThread state change
> ---
>
> Key: KAFKA-8620
> URL: https://issues.apache.org/jira/browse/KAFKA-8620
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the call to `StreamThread.addRecordsToTasks` we don't have synchronization 
> when we attempt to extract active tasks. If after one long poll in runOnce 
> the application state changes to PENDING_SHUTDOWN, there is a potential close 
> on TaskManager which erases the active tasks map, thus triggering NPE and 
> bringing the thread state to a false shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8620) Race condition in StreamThread state change

2019-07-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8620:
---
Affects Version/s: 2.1.0

> Race condition in StreamThread state change
> ---
>
> Key: KAFKA-8620
> URL: https://issues.apache.org/jira/browse/KAFKA-8620
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the call to `StreamThread.addRecordsToTasks` we don't have synchronization 
> when we attempt to extract active tasks. If after one long poll in runOnce 
> the application state changes to PENDING_SHUTDOWN, there is a potential close 
> on TaskManager which erases the active tasks map, thus triggering NPE and 
> bringing the thread state to a false shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-15 Thread Raman Gupta (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885640#comment-16885640
 ] 

Raman Gupta commented on KAFKA-8650:


There is also a related API change / improvement that I believe should be in 
the KIP if one is created.

Currently, a new (or reset) stream initializes itself when "start" is called 
i.e. "start" is overloaded to mean "initialize and start".

If "initialize" was possible without start, then it would be easy to call 
"initialize" to create the stream's topology, then use the application reset 
tool to set the desired offsets, then use "start" to actually start the stream 
at the desired place. This would allow the user to carry out this process 
without actually relying on the system to throw an exception at stream startup 
time, post-init without an offset.

This can also be done in advance of allowing auto.offset.reset = none for 
streams, which, when done, would just become a safety on the stream.start 
trigger.

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885632#comment-16885632
 ] 

ASF GitHub Bot commented on KAFKA-8670:
---

wyuka commented on pull request #7091: KAFKA-8670: Fix kafka-topics.sh 
--describe without --topic when cluster has no topics.
URL: https://github.com/apache/kafka/pull/7091
 
 
   If there are no topics in a cluster, kafka-topics.sh --describe without
   a --topic option should return empty list, not throw an exception.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   We pass a boolean flag to `ensureTopicExists` method indicating whether to 
throw an exception if there are no topics in the cluster. In case of 
`kafka-topics.sh --describe`, the exception **should NOT** be thrown if either 
of these are true -
   1. A `--topic` option was not passed to the CLI. In that case, the output 
should be empty.
   2. A `--if-exists` option was passed to the CLI.
   
   Earlier, the first condition was not part of the check. This bugfix adds the 
first condition mentioned above to the check.
   
   I added the necessary unit test to check for this case.
   
   Also, I created a Kafka cluster, and without creating any topics on it, ran 
   ```
   ./kafka-topics.sh --zookeeper  --describe
   ```
   
   This did not throw any exception, like it used to earlier.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist on the cluster
> ---
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Tirtha Chatterjee
>Assignee: Tirtha Chatterjee
>Priority: Major
>
> When trying to describe all  the kafka-topics.sh utility, a user would run 
> kafka-topics.sh --describe without passing a --topic option. If there are no 
> topics on the cluster, Kafka returns an error with IllegalArgumentException.
> {code:java}
> ./kafka-topics.sh --zookeeper 
> 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
> Error while executing topic command : Topics in [] does not exist
> [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
> [] does not exist
> at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
> at 
> kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> (kafka.admin.TopicCommand$)
> {code}
>  
>  If no --topic option is passed to the command, and there are no topics on 
> the cluster, the command should not fail, rather have empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-15 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8638:
--
Description: 
Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during preferred leader election.

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader. When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election. Below is a list of use cases:
 * (If broker_id 1 is a swapped failed host and brought up with last segments 
or latest offset without historical data (There is another effort on this), 
it's better for it to not serve leadership till it's caught-up.

 * The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines. We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas.

 * If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline. The “Flapping” effect was seen in the past 
when 2 or more brokers were bad, when they lost leadership constantly/quickly, 
the sets of partition replicas they belong to will see leadership constantly 
changing. The ultimate solution is to swap these bad hosts. But for quick 
mitigation, we can also put the bad hosts in the Preferred Leader Blacklist to 
move the priority of its being elected as leaders to the lowest.

 * If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

 * Avoid bouncing broker in order to lose its leadership: it would be good if 
we have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election. A 
bouncing broker will cause temporary URP, and sometimes other issues. Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers. If 
broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline, the 3rd broker can take leadership.

The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.

KIP is located at 
[KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]

  was:
Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

* (If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not 

[jira] [Resolved] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

2019-07-15 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8602.

Resolution: Fixed

> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Critical
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This 
> happens when a StreamThread gets assigned standby tasks for sub-topologies 
> with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one 
> standby replica each and the following topology. The input topic should have 
> two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder> keyValueStoreBuilder =
> 
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer Integer>>() {
> private KeyValueStore state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore) 
> context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue transform(final Integer key, 
> final Integer value) {
> final KeyValue result = new KeyValue<>(key, 
> value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of 
> the sub-topology have logging disabled.  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885595#comment-16885595
 ] 

ASF GitHub Bot commented on KAFKA-8602:
---

bbejeck commented on pull request #7008: KAFKA-8602: Fix bug in stand-by task 
creation
URL: https://github.com/apache/kafka/pull/7008
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Critical
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This 
> happens when a StreamThread gets assigned standby tasks for sub-topologies 
> with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one 
> standby replica each and the following topology. The input topic should have 
> two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder> keyValueStoreBuilder =
> 
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer Integer>>() {
> private KeyValueStore state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore) 
> context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue transform(final Integer key, 
> final Integer value) {
> final KeyValue result = new KeyValue<>(key, 
> value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of 
> the sub-topology have logging disabled.  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-15 Thread James Ritt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885498#comment-16885498
 ] 

James Ritt commented on KAFKA-4212:
---

Thanks [~ableegoldman] & [~mjsax], you both make good points. Unfortunately the 
`Map` argument to `setConfig()` appears to be populated from 
`org.apache.kafka.streams.processor.ProcessorContext#appConfigs`, meaning it's 
effectively just pulling in the current `StreamsConfig`.

In order to keep the configs consolidated as per Sophie's request, what do we 
think about defining a new method on the `RocksDBConfigSetter` interface?

We could possibly create it with a default (for backwards compatibility): 
something like `default Integer getTtl(final String storeName, final Options 
options) \{return null;}`, with the idea that we'd check this method's result 
to determine which RocksDB constructor to call.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers

2019-07-15 Thread Brian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885489#comment-16885489
 ] 

Brian commented on KAFKA-7931:
--

Would love to see a patch. How do you know you've solved the issue?

Are you going to try and get this merged back into 
[https://github.com/helm/charts/tree/master/incubator/kafka] 

 

> Java Client: if all ephemeral brokers fail, client can never reconnect to 
> brokers
> -
>
> Key: KAFKA-7931
> URL: https://issues.apache.org/jira/browse/KAFKA-7931
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Brian
>Priority: Critical
>
> Steps to reproduce:
>  * Setup kafka cluster in GKE, with bootstrap server address configured to 
> point to a load balancer that exposes all GKE nodes
>  * Run producer that emits values into a partition with 3 replicas
>  * Kill every broker in the cluster
>  * Wait for brokers to restart
> Observed result:
> The java client cannot find any of the nodes even though they have all 
> recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) 
> could not be established. Broker may not be available.".
> Note, this is *not* a duplicate of 
> https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client 
> version that contains the fix for 
> https://issues.apache.org/jira/browse/KAFKA-7890.
> Versions:
> Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image
> Client: trunk from a few days ago (git sha 
> 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster

2019-07-15 Thread Tirtha Chatterjee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tirtha Chatterjee updated KAFKA-8670:
-
Summary: kafka-topics.sh shows IllegalArgumentException when describing all 
topics if no topics exist on the cluster  (was: kafka-topics.sh shows 
IllegalArgumentException when describing all topics if no topics exist)

> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist on the cluster
> ---
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Tirtha Chatterjee
>Assignee: Tirtha Chatterjee
>Priority: Major
>
> When trying to describe all  the kafka-topics.sh utility, a user would run 
> kafka-topics.sh --describe without passing a --topic option. If there are no 
> topics on the cluster, Kafka returns an error with IllegalArgumentException.
> {code:java}
> ./kafka-topics.sh --zookeeper 
> 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
> Error while executing topic command : Topics in [] does not exist
> [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
> [] does not exist
> at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
> at 
> kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> (kafka.admin.TopicCommand$)
> {code}
>  
>  If no --topic option is passed to the command, and there are no topics on 
> the cluster, the command should not fail, rather have empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist

2019-07-15 Thread Tirtha Chatterjee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tirtha Chatterjee updated KAFKA-8670:
-
Description: 
When trying to describe all  the kafka-topics.sh utility, a user would run 
kafka-topics.sh --describe without passing a --topic option. If there are no 
topics on the cluster, Kafka returns an error with IllegalArgumentException.
{code:java}
./kafka-topics.sh --zookeeper 
172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
Error while executing topic command : Topics in [] does not exist
[2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
[] does not exist
at 
kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
at 
kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
{code}
 
 If no --topic option is passed to the command, and there are no topics on the 
cluster, the command should not fail, rather have empty output.

  was:
When trying to describe all  the kafka-topics.sh utility, a user would run 
kafka-topics.sh --describe without passing a --topic option. If there are no 
topics on the cluster, Kafka returns an error with IllegalArgumentException.
{code:java}
./kafka-topics.sh --zookeeper 
172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
Error while executing topic command : Topics in [] does not exist
[2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
[] does not exist
at 
kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
at 
kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
{code}
 
If no --topic option is passed to the command, it should not fail, rather have 
empty output.


> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist
> 
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Tirtha Chatterjee
>Assignee: Tirtha Chatterjee
>Priority: Major
>
> When trying to describe all  the kafka-topics.sh utility, a user would run 
> kafka-topics.sh --describe without passing a --topic option. If there are no 
> topics on the cluster, Kafka returns an error with IllegalArgumentException.
> {code:java}
> ./kafka-topics.sh --zookeeper 
> 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
> Error while executing topic command : Topics in [] does not exist
> [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
> [] does not exist
> at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
> at 
> kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> (kafka.admin.TopicCommand$)
> {code}
>  
>  If no --topic option is passed to the command, and there are no topics on 
> the cluster, the command should not fail, rather have empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist

2019-07-15 Thread Tirtha Chatterjee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885476#comment-16885476
 ] 

Tirtha Chatterjee commented on KAFKA-8670:
--

This bug was introduced as part of this commit

[https://github.com/apache/kafka/commit/55334453a561646b303e7de961e5990345effc15]

 

I have written a fix for this issue. Will be publishing it soon.

> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist
> 
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Tirtha Chatterjee
>Assignee: Tirtha Chatterjee
>Priority: Major
>
> When trying to describe all  the kafka-topics.sh utility, a user would run 
> kafka-topics.sh --describe without passing a --topic option. If there are no 
> topics on the cluster, Kafka returns an error with IllegalArgumentException.
> {code:java}
> ./kafka-topics.sh --zookeeper 
> 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
> Error while executing topic command : Topics in [] does not exist
> [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
> [] does not exist
> at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
> at 
> kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> (kafka.admin.TopicCommand$)
> {code}
>  
> If no --topic option is passed to the command, it should not fail, rather 
> have empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist

2019-07-15 Thread Tirtha Chatterjee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885474#comment-16885474
 ] 

Tirtha Chatterjee commented on KAFKA-8670:
--

This issue is distinct from https://issues.apache.org/jira/browse/KAFKA-8053 
which is about Kafka's error message being confusing when --describe is called 
with a --topic argument, but there are no topics on the cluster.

The distinction here is that this ticket addresses the case where no --topic is 
passed, while the other focuses on a better error message when --topic is 
indeed passed, but there are no topics on the cluster.

> kafka-topics.sh shows IllegalArgumentException when describing all topics if 
> no topics exist
> 
>
> Key: KAFKA-8670
> URL: https://issues.apache.org/jira/browse/KAFKA-8670
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Tirtha Chatterjee
>Assignee: Tirtha Chatterjee
>Priority: Major
>
> When trying to describe all  the kafka-topics.sh utility, a user would run 
> kafka-topics.sh --describe without passing a --topic option. If there are no 
> topics on the cluster, Kafka returns an error with IllegalArgumentException.
> {code:java}
> ./kafka-topics.sh --zookeeper 
> 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
> Error while executing topic command : Topics in [] does not exist
> [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
> [] does not exist
> at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
> at 
> kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> (kafka.admin.TopicCommand$)
> {code}
>  
> If no --topic option is passed to the command, it should not fail, rather 
> have empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist

2019-07-15 Thread Tirtha Chatterjee (JIRA)
Tirtha Chatterjee created KAFKA-8670:


 Summary: kafka-topics.sh shows IllegalArgumentException when 
describing all topics if no topics exist
 Key: KAFKA-8670
 URL: https://issues.apache.org/jira/browse/KAFKA-8670
 Project: Kafka
  Issue Type: Bug
  Components: admin, tools
Affects Versions: 2.2.1, 2.3.0, 2.2.0
Reporter: Tirtha Chatterjee
Assignee: Tirtha Chatterjee


When trying to describe all  the kafka-topics.sh utility, a user would run 
kafka-topics.sh --describe without passing a --topic option. If there are no 
topics on the cluster, Kafka returns an error with IllegalArgumentException.
{code:java}
./kafka-topics.sh --zookeeper 
172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe
Error while executing topic command : Topics in [] does not exist
[2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in 
[] does not exist
at 
kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
at 
kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
{code}
 
If no --topic option is passed to the command, it should not fail, rather have 
empty output.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8669) Add java security providers in Kafka Security config

2019-07-15 Thread Sai Sandeep (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sai Sandeep updated KAFKA-8669:
---
Description: 
Currently kafka supports ssl.keymanager.algorithm and 
ssl.trustmanager.algorithm parameters as part of secure config. These 
parameters can be configured to load the key manager and trust managers which 
provide keys and certificates for ssl handshakes with the clients/server. The 
algorithms configured by parameters need to be registered by Java security 
provider classes. These provider classes are configured as JVM properties 
through java.security file. An example file given below
{code:java}
$ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security

...

security.provider.1=sun.security.provider.Sun

security.provider.2=sun.security.rsa.SunRsaSign

security.provider.3=sun.security.ec.SunEC

…
{code}
Custom keymanager and trustmanager algorithms can be used to supply the kafka 
brokers with keys and certificates, these algorithms can be used to replace the 
traditional, non-scalable static keystore and truststore jks files.

To take advantage of these custom algorithms, we want to support java security 
provider parameter in security config. This param can be used by kafka brokers 
or kafka clients(when connecting to the kafka brokers). The security providers 
can also be used for configuring security in SASL based communication too.

 

  was:
Currently kafka supports ssl.keymanager.algorithm and 
ssl.trustmanager.algorithm parameters as part of secure config. These 
parameters can be configured to load the key manager and trust managers which 
provide keys and certificates for ssl handshakes with the clients/server. The 
algorithms configured by parameters need to be registered by Java security 
provider classes. These provider classes are configured as JVM properties 
through java.security file. An example file given below

``` 

$ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security

...

security.provider.1=sun.security.provider.Sun

security.provider.2=sun.security.rsa.SunRsaSign

security.provider.3=sun.security.ec.SunEC

…

``` 

Custom keymanager and trustmanager algorithms can be used to supply the kafka 
brokers with keys and certificates, these algorithms can be used to replace the 
traditional, non-scalable static keystore and truststore jks files.

To take advantage of these custom algorithms, we want to support java security 
provider parameter in security config. This param can be used by kafka brokers 
or kafka clients(when connecting to the kafka brokers). The security providers 
can also be used for configuring security in SASL based communication too.

 


> Add java security providers in Kafka Security config
> 
>
> Key: KAFKA-8669
> URL: https://issues.apache.org/jira/browse/KAFKA-8669
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sai Sandeep
>Priority: Minor
>
> Currently kafka supports ssl.keymanager.algorithm and 
> ssl.trustmanager.algorithm parameters as part of secure config. These 
> parameters can be configured to load the key manager and trust managers which 
> provide keys and certificates for ssl handshakes with the clients/server. The 
> algorithms configured by parameters need to be registered by Java security 
> provider classes. These provider classes are configured as JVM properties 
> through java.security file. An example file given below
> {code:java}
> $ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security
> ...
> security.provider.1=sun.security.provider.Sun
> security.provider.2=sun.security.rsa.SunRsaSign
> security.provider.3=sun.security.ec.SunEC
> …
> {code}
> Custom keymanager and trustmanager algorithms can be used to supply the kafka 
> brokers with keys and certificates, these algorithms can be used to replace 
> the traditional, non-scalable static keystore and truststore jks files.
> To take advantage of these custom algorithms, we want to support java 
> security provider parameter in security config. This param can be used by 
> kafka brokers or kafka clients(when connecting to the kafka brokers). The 
> security providers can also be used for configuring security in SASL based 
> communication too.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8669) Add java security providers in Kafka Security config

2019-07-15 Thread Sai Sandeep (JIRA)
Sai Sandeep created KAFKA-8669:
--

 Summary: Add java security providers in Kafka Security config
 Key: KAFKA-8669
 URL: https://issues.apache.org/jira/browse/KAFKA-8669
 Project: Kafka
  Issue Type: Improvement
Reporter: Sai Sandeep


Currently kafka supports ssl.keymanager.algorithm and 
ssl.trustmanager.algorithm parameters as part of secure config. These 
parameters can be configured to load the key manager and trust managers which 
provide keys and certificates for ssl handshakes with the clients/server. The 
algorithms configured by parameters need to be registered by Java security 
provider classes. These provider classes are configured as JVM properties 
through java.security file. An example file given below

``` 

$ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security

...

security.provider.1=sun.security.provider.Sun

security.provider.2=sun.security.rsa.SunRsaSign

security.provider.3=sun.security.ec.SunEC

…

``` 

Custom keymanager and trustmanager algorithms can be used to supply the kafka 
brokers with keys and certificates, these algorithms can be used to replace the 
traditional, non-scalable static keystore and truststore jks files.

To take advantage of these custom algorithms, we want to support java security 
provider parameter in security config. This param can be used by kafka brokers 
or kafka clients(when connecting to the kafka brokers). The security providers 
can also be used for configuring security in SASL based communication too.

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8613) Set default grace period to 0

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885438#comment-16885438
 ] 

Matthias J. Sax commented on KAFKA-8613:


[~LillianY] – thanks for you interest. However, this ticket can only be 
addressed in a major release, as we need to maintain backward compatibility in 
minor releases. There are no plans atm, to do a major release. Hence, this 
ticket is blocked until the community decided to do a 3.0 release.

> Set default grace period to 0
> -
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Bruno Cadonna
>Priority: Blocker
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.  
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should set the default grace period to 
> {{Duration.ZERO}}.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8668) Improve broker shutdown time

2019-07-15 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-8668:


 Summary: Improve broker shutdown time
 Key: KAFKA-8668
 URL: https://issues.apache.org/jira/browse/KAFKA-8668
 Project: Kafka
  Issue Type: Improvement
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


 During LogManager shutdown, we need to call {{timeIndex.maybeAppend()}} on 
each log segment close. Since in 2.0 we completely skip sanity check and do 
lazy mmap on indexes of segments below recovery point, in 
{{timeIndex.maybeAppend()}} we may need to mmap the time index file for the 
inactive segment if it didn't get mmap before, which is not neccessary becuase 
the time index hasn't changed at all.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8667) Improve leadership transition time

2019-07-15 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-8667:


 Summary: Improve leadership transition time
 Key: KAFKA-8667
 URL: https://issues.apache.org/jira/browse/KAFKA-8667
 Project: Kafka
  Issue Type: Improvement
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


When the replica fetcher thread processes fetch response, it will hold the 
{{partitionMapLock}}. If at the same time, a LeaderAndIsr request comes in, it 
will be blocked at the end of its processing when calling 
{{shutdownIdleFetcherThread}} because it will need to wait for the 
{{partitionMapLock}} of each replica fetcher thread to be acquired to check 
whether there is any partition assigned to each fetcher and the request handler 
thread performs this check sequentially for the fetcher threads

For example, in a cluster with 20 brokers and num.replica.fetcher.thread set to 
32, if each fetcher thread holds lock for a little bit longer, the total time 
for the request handler thread to finish shutdownIdleFetcherThread can be a lot 
larger due to waiting for the partitionMapLock for a longer time for each 
fetcher thread. If the LeaderAndIsr gets blocked for >request.timeout.ms 
(default to 30s) in the broker, request send thread in the controller side will 
timeout while waiting for the response and try to establish a new connection to 
the broker and re-send the request, which will break in-order delivery because 
we will have more than one channel talking to the broker. Moreover, this may 
make the lock contention problem worse or saturate request handler threads 
because duplicate control requests are sent to the broker for multiple time. In 
our own testing, we saw up to *8 duplicate LeaderAndIsrRequest* being sent to 
the broker during bounce and the 99th LeaderAndIsr local time goes up to ~500s.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885435#comment-16885435
 ] 

Matthias J. Sax commented on KAFKA-8650:


It's a first time report / request. Hence, it does not seem to be a problem for 
many users atm. Hard to judge. – Also, there is a workaround (even if not easy 
to apply).

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8666) Improve Documentation on usage of Materialized config object

2019-07-15 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-8666:
--

 Summary: Improve Documentation on usage of Materialized config 
object
 Key: KAFKA-8666
 URL: https://issues.apache.org/jira/browse/KAFKA-8666
 Project: Kafka
  Issue Type: Improvement
  Components: documentation, streams
Reporter: Bill Bejeck


When using the Materialized object if the user wants to name the statestore with
{code:java}
Materialized.as("MyStoreName"){code}
then subsequently provide the key and value serde the calls to do so must take 
the form of 
{code:java}
Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde)
{code}
If users do the following 
{code:java}
Materialized.as("MyStoreName").with(keySerde, valueSerde)
{code}
the Materialized instance created by the "as(storeName)" call is replaced by a 
new Materialized instance resulting from the "with(...)" call and any 
configuration on the first Materialized instance is lost.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8665) WorkerSourceTask race condition when rebalance occurs before task has started

2019-07-15 Thread Chris Cranford (JIRA)
Chris Cranford created KAFKA-8665:
-

 Summary: WorkerSourceTask race condition when rebalance occurs 
before task has started
 Key: KAFKA-8665
 URL: https://issues.apache.org/jira/browse/KAFKA-8665
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.2.0
Reporter: Chris Cranford


In our project we have several {{SourceTask}} implementations that perform a 
set of sequential steps in the {{SourceTask#start}} call.  It's possible that 
during these sequential operations, a rebalance occurs which leads to the 
situation where the {{WorkerSourceTask}} transitions to the state where 
{{startedShutdownBeforeStartCompleted=true}} and then the runtime starts a 
brand new task.

This is mostly a problem around specific named resources that are registered 
when a call to {{SourceTask#start}} happens and those same resources are 
unregistered later when the call to {{SourceTask#stop}} occurs. 

For us specifically, this is a problem with JMX resource 
registration/unregistration.  We register those beans at the end of the call to 
{{SourceTask#start}} and unregister in the call to {{stop}}.  Due to the order 
of start/stop pairs combined with where a rebalance is triggered, this leads to 
 # Register JMX beans when SourceTask A1 is started.
 # Register JMX beans when SourceTask A2 is started with rebalance.
 ## JMX beans failed to register as they're already registered.
 # SourceTask A1 finally stops, triggers unregistering JMX beans

 

In our use case we're experiencing a problem with the 
registration/unregistration of JMX resources with the nature of how a rebalance 
is triggered while the task hasn't yet fully started and never gets stopped



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation

2019-07-15 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885391#comment-16885391
 ] 

Bill Bejeck commented on KAFKA-8646:


Thanks again [~jmhostalet] for reporting this.


After looking into this, I've been able to determine that Kafka Streams does 
the right thing and does not create changelog topics when using 
Materialized.withLoggingDisabled().

The issue seems to be how you are passing your key and value serdes to the 
materialized object. This issue is a subtle one, so, please bear with a 
somewhat lengthy explanation.

The call "Materialized.as(aggregation-updater)" returns a _*new Materialized 
instance*_, using the provided name for the state store (which in turn would be 
the base name for the changelog topic if needed, but I'll get to that in a 
minute). Now the chained call "withLoggingDisabled()" sets "loggingEnabled" 
flag to false and returns _*the same Materialized instance*_ created before by 
the call "as(store-name)".

But the method call "Materialized.with(keySerde, valueSerde)" is a static 
method on Materialized and returns a _*brand new Materialized instance,*_ and 
this new Materialize instance is the one passed into Kafka Streams. Since the 
"loggingEnabled" flag defaults to true, Kafka Streams is creating the changelog 
topic.

Here's some example code I ran to prove my theory.

First, I'll change from using the static "with(keySerde, valueSerde)" method to 
use "withKeySerde" followed by the "withValueSerde" method.

 
{code:java}
KStream intKstream = builder.stream("input");
 Aggregator aggregator = (k, v, va) -> v + va;

 intKstream.groupByKey().aggregate(() -> 0, aggregator,
 Materialized.>as("aggregation-updater")
 
.withLoggingDisabled().withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream().to("output");
{code}
 

And here's the results of running "kafka-topics --list" after starting the 
streams application

.
{noformat}
/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
input
output{noformat}
 

Now I'll change the streams application to use the static "with(keySerde, 
valueSerde)" method.

 
{code:java}
intKstream.groupByKey().aggregate(() -> 0, aggregator,
 Materialized.>as("aggregation-updater")
 .withLoggingDisabled().with(Serdes.String(), 
Serdes.Integer())).toStream().to("output");
 
{code}
Now this time we'll see Kafka Streams has created the changelog topic:

 
{noformat}
./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
create-unnecessary-changelog-internal-KSTREAM-AGGREGATE-STATE-STORE-01-changelog
input
output{noformat}
 

Also, you'll notice the name of the changelog topic is a generated one but 
should be "aggregation-updater-changelog", but since the new Materialized 
instance is used the store name is lost as well.

So if you could first delete any unwanted changelog topics then update your 
code to use

 
{code:java}
Materialized.as("aggregation-updater")
 .withLoggingDisabled()
 .withKeySerde(Serdes.String)
 .withValueSerde(new MyDtoSerde())){code}
 

restart your application and confirm that Kafka Streams is not creating 
changelog topics; we can close this ticket.

But we should update our documentation to clearly state the correct method 
calls to use when naming a state store.

Thanks again,

Bill

> Materialized.withLoggingDisabled() does not disable changelog topics creation
> -
>
> Key: KAFKA-8646
> URL: https://issues.apache.org/jira/browse/KAFKA-8646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: jmhostalet
>Assignee: Bill Bejeck
>Priority: Minor
>
> I have a cluster with 3 brokers running version 0.11
> My kafka-streams app was using kafka-client 0.11.0.1 but recently I've 
> migrated to 2.3.0
> I have no executed any migration as my data is disposable, therefore I have 
> deleted all intermediate topics, except input and output topics.
> My streams config is: 
> {code:java}
> application.id = consumer-id-v1.00
> application.server =
> bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 524288000
> client.id =
> commit.interval.ms = 3
> connections.max.idle.ms = 54
> default.deserialization.exception.handler = class 
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class 
> org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class 
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class com.acme.stream.TimeExtractor
> default.value.serde = class com.acme.serde.MyDtoSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 30
> metric.reporters 

[jira] [Commented] (KAFKA-8604) kafka log dir was marked as offline because of deleting segments of __consumer_offsets failed

2019-07-15 Thread Alex Bertzeletos (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885316#comment-16885316
 ] 

Alex Bertzeletos commented on KAFKA-8604:
-

[~ymxz] So, in your case, setting the log.retention.check.interval.ms=3000 
helped as a workaround?

> kafka log dir was marked as offline because of deleting segments of 
> __consumer_offsets failed
> -
>
> Key: KAFKA-8604
> URL: https://issues.apache.org/jira/browse/KAFKA-8604
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 1.0.1
>Reporter: songyingshuan
>Priority: Major
> Attachments: error-logs.log
>
>
> We encountered a problem in our product env without any foresight. When kafka 
> broker trying to clean __consumer_offsets-38 (and only happents to this 
> partition), the log shows
> it failed, and marking the whole disk/log dir offline, and this leads to a 
> negative impact on some normal partitions (because of the ISR list of those 
> partitions decrease).
> we had to restart the broker server to reuse the disk/dir which was marked as 
> offline. BUT!! this problem occurs periodically with the same reason so we 
> have to restart broker periodically.
> we read some source code of kafka-1.0.1, but cannot make sure why this 
> happends. And The cluster status had been good until this problem suddenly 
> attacked us.
> the error log is something like this :
>  
> {code:java}
> 2019-06-25 00:11:26,241 INFO kafka.log.TimeIndex: Deleting index 
> /data6/kafka/data/__consumer_offsets-38/012855596978.timeindex.deleted
> 2019-06-25 00:11:26,258 ERROR kafka.server.LogDirFailureChannel: Error while 
> deleting segments for __consumer_offsets-38 in dir /data6/kafka/data
> java.io.IOException: Delete of log .log.deleted failed.
> at kafka.log.LogSegment.delete(LogSegment.scala:496)
> at 
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
> at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
> at 
> kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
> at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2019-06-25 00:11:26,265 ERROR kafka.utils.KafkaScheduler: Uncaught exception 
> in scheduled task 'delete-file'
> org.apache.kafka.common.errors.KafkaStorageException: Error while deleting 
> segments for __consumer_offsets-38 in dir /data6/kafka/data
> Caused by: java.io.IOException: Delete of log 
> .log.deleted failed.
> at kafka.log.LogSegment.delete(LogSegment.scala:496)
> at 
> kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
> at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
> at 
> kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
> at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2019-06-25 00:11:26,268 INFO kafka.server.ReplicaManager: [ReplicaManager 
> 

[jira] [Assigned] (KAFKA-8453) AdminClient describeTopic should handle partition level errors

2019-07-15 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8453:
--

Assignee: Lee Dongjin

> AdminClient describeTopic should handle partition level errors
> --
>
> Key: KAFKA-8453
> URL: https://issues.apache.org/jira/browse/KAFKA-8453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Lee Dongjin
>Priority: Major
>
> The Metadata response may contain the following partition-level error codes: 
> LEADER_NOT_AVAILABLE, REPLICA_NOT_AVAILABLE, and LISTENER_NOT_FOUND. The 
> AdminClient at the moment does not appear to be checking these error codes. 
> This seems mostly harmless in the case of LEADER_NOT_AVAILABLE and 
> REPLICA_NOT_AVAILABLE, but potentially we should raise an error in the case 
> of LISTENER_NOT_FOUND since the result would otherwise be misleading.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8453) AdminClient describeTopic should handle partition level errors

2019-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885289#comment-16885289
 ] 

ASF GitHub Bot commented on KAFKA-8453:
---

dongjinleekr commented on pull request #7089: KAFKA-8453: AdminClient 
describeTopic should handle partition level errors
URL: https://github.com/apache/kafka/pull/7089
 
 
   Here is the draft fix. The approach is simple:
   
   1. Add a new method, `MetadataResponse#partitionErrors`, which returns a map 
from topic to the set of its partition errors. (It follows how 
`MetadataResponse#topicMetadata` does.)
   2. `Call#handleResponse` in `KafkaAdminClient#describeTopics` now uses both 
of topic level error map and partition level error map; if there is 
`ListenerNotFound` error in partition level error map, it calls 
`KafkaFuture#completeExceptionally`.
   
   However, since it now throws a new Exception, it brokes following tests 
which are related to the leader election.
   
   - 
`kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable`
   - `kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersAndNoop`
   - 
`kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersForManyPartitions`
   - 
`kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersForAllPartitions`
   - 
`kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersForOnePartition`
   - 
`kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersWhenNoLiveBrokers`
   - 
`kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersAndNoop`
   - 
`kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersForManyPartitions`
   - 
`kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersForAllPartitions`
   - 
`kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersForOnePartition`
   - 
`kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersWhenNoLiveBrokers`
   - 
`kafka.admin.TopicCommandWithAdminClientTest.testDescribeUnavailablePartitions`
   - 
`kafka.admin.TopicCommandWithAdminClientTest.testDescribeUnderMinIsrPartitionsMixed`
   
   Which approach would be appropriate to fix the problem above? Do you have 
something in mind?
   
   cc/ @hachikuji
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient describeTopic should handle partition level errors
> --
>
> Key: KAFKA-8453
> URL: https://issues.apache.org/jira/browse/KAFKA-8453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> The Metadata response may contain the following partition-level error codes: 
> LEADER_NOT_AVAILABLE, REPLICA_NOT_AVAILABLE, and LISTENER_NOT_FOUND. The 
> AdminClient at the moment does not appear to be checking these error codes. 
> This seems mostly harmless in the case of LEADER_NOT_AVAILABLE and 
> REPLICA_NOT_AVAILABLE, but potentially we should raise an error in the case 
> of LISTENER_NOT_FOUND since the result would otherwise be misleading.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale

2019-07-15 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885284#comment-16885284
 ] 

Patrik Kleindl commented on KAFKA-8024:
---

[~sujayopensource] You need to ask for permissions on the mailing list.

I have assigned this to myself now as a PR is already available.

> UtilsTest.testFormatBytes fails with german locale
> --
>
> Key: KAFKA-8024
> URL: https://issues.apache.org/jira/browse/KAFKA-8024
> Project: Kafka
>  Issue Type: Bug
>Reporter: Patrik Kleindl
>Assignee: Patrik Kleindl
>Priority: Trivial
>
> The unit test fails when the default locale is not English (in my case, deAT)
> assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));
>  
> org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED
>     org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB>
>         at org.junit.Assert.assertEquals(Assert.java:115)
>         at org.junit.Assert.assertEquals(Assert.java:144)
>         at 
> org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106)
>  
> The easiest fix in this case should be adding
> {code:java}
> jvmArgs '-Duser.language=en -Duser.country=US'{code}
> to the test configuration 
> [https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270]
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale

2019-07-15 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl reassigned KAFKA-8024:
-

Assignee: Patrik Kleindl

> UtilsTest.testFormatBytes fails with german locale
> --
>
> Key: KAFKA-8024
> URL: https://issues.apache.org/jira/browse/KAFKA-8024
> Project: Kafka
>  Issue Type: Bug
>Reporter: Patrik Kleindl
>Assignee: Patrik Kleindl
>Priority: Trivial
>
> The unit test fails when the default locale is not English (in my case, deAT)
> assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));
>  
> org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED
>     org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB>
>         at org.junit.Assert.assertEquals(Assert.java:115)
>         at org.junit.Assert.assertEquals(Assert.java:144)
>         at 
> org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106)
>  
> The easiest fix in this case should be adding
> {code:java}
> jvmArgs '-Duser.language=en -Duser.country=US'{code}
> to the test configuration 
> [https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270]
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers

2019-07-15 Thread Sam Weston (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885113#comment-16885113
 ] 

Sam Weston edited comment on KAFKA-7931 at 7/15/19 11:40 AM:
-

Good news! I've got to the bottom of it!

The fix is to use a DNS name as the advertised listener instead of the Pod IP 
address (in my case the Kubernetes headless service). Now I can restart 
containers as quickly as I like and my Java apps don't get upset.

e.g. 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://pulseplatform-dev-kafka-0.pulseplatform-dev-kafka-headless.pulseplatform-dev:9092
 where the headless service is called pulseplatform-dev-kafka-headless, my 
namespace is pulseplatform-dev and the pod is called pulseplatform-dev-kafka-0

If you're using the incubator helm chart let me know and I'll provide more 
details of my values file.


was (Author: cablespaghetti):
Good news! I've got to the bottom of it!

The fix is to use a DNS name as the advertised listener instead of the Pod IP 
address (in my case the Kubernetes headless service). Now I can restart 
containers as quickly as I like and my Java apps don't get upset.

e.g. 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://pulseplatform-dev-kafka-0.pulseplatform-dev-kafka-headless.pulseplatform-dev:9092
 where the headless service is called pulseplatform-dev-kafka-headless, my 
namespace is pulseplatform-dev and the pod is called pulseplatform-dev-kafka-0

> Java Client: if all ephemeral brokers fail, client can never reconnect to 
> brokers
> -
>
> Key: KAFKA-7931
> URL: https://issues.apache.org/jira/browse/KAFKA-7931
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Brian
>Priority: Critical
>
> Steps to reproduce:
>  * Setup kafka cluster in GKE, with bootstrap server address configured to 
> point to a load balancer that exposes all GKE nodes
>  * Run producer that emits values into a partition with 3 replicas
>  * Kill every broker in the cluster
>  * Wait for brokers to restart
> Observed result:
> The java client cannot find any of the nodes even though they have all 
> recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) 
> could not be established. Broker may not be available.".
> Note, this is *not* a duplicate of 
> https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client 
> version that contains the fix for 
> https://issues.apache.org/jira/browse/KAFKA-7890.
> Versions:
> Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image
> Client: trunk from a few days ago (git sha 
> 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers

2019-07-15 Thread Sam Weston (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885113#comment-16885113
 ] 

Sam Weston commented on KAFKA-7931:
---

Good news! I've got to the bottom of it!

The fix is to use a DNS name as the advertised listener instead of the Pod IP 
address (in my case the Kubernetes headless service). Now I can restart 
containers as quickly as I like and my Java apps don't get upset.

e.g. 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://pulseplatform-dev-kafka-0.pulseplatform-dev-kafka-headless.pulseplatform-dev:9092
 where the headless service is called pulseplatform-dev-kafka-headless, my 
namespace is pulseplatform-dev and the pod is called pulseplatform-dev-kafka-0

> Java Client: if all ephemeral brokers fail, client can never reconnect to 
> brokers
> -
>
> Key: KAFKA-7931
> URL: https://issues.apache.org/jira/browse/KAFKA-7931
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Brian
>Priority: Critical
>
> Steps to reproduce:
>  * Setup kafka cluster in GKE, with bootstrap server address configured to 
> point to a load balancer that exposes all GKE nodes
>  * Run producer that emits values into a partition with 3 replicas
>  * Kill every broker in the cluster
>  * Wait for brokers to restart
> Observed result:
> The java client cannot find any of the nodes even though they have all 
> recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) 
> could not be established. Broker may not be available.".
> Note, this is *not* a duplicate of 
> https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client 
> version that contains the fix for 
> https://issues.apache.org/jira/browse/KAFKA-7890.
> Versions:
> Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image
> Client: trunk from a few days ago (git sha 
> 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8658) A way to configure the jmx rmi port

2019-07-15 Thread Agostino Sarubbo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884970#comment-16884970
 ] 

Agostino Sarubbo commented on KAFKA-8658:
-

Hello Richard,

 

thanks for the fast response.

It LGTM.

> A way to configure the jmx rmi port
> ---
>
> Key: KAFKA-8658
> URL: https://issues.apache.org/jira/browse/KAFKA-8658
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 1.0.0
> Environment: Centos 7
>Reporter: Agostino Sarubbo
>Priority: Minor
>
> Hello,
> I'm on kafka-1.0.0 so I'm not sure if it is fixed in the current version.
> Atm we are using the following in the service script to use JMX:
> Environment=JMX_PORT=7666
> However there is no way to set the jmx_rmi_port. When there is no 
> specification for jmx_rmi_port the jvm assigns a random port. This 
> complicates the way we manage the firewall.
> Would be great if there is a way to set the jmx_rmi_port in the same way, 
> e.g.:
> Environment=JMX_RMI_PORT=7667
> The variable used during the jvm start is: 
> -Dcom.sun.management.jmxremote.rmi.port=



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo

2019-07-15 Thread Vu Le (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vu Le updated KAFKA-8664:
-
Description: 
Hi team,

I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB Kafka 
Connector ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md])

However, if I send a non-JSON format message the Connector died. Please see the 
log file for details.

My config file:
{code:java}
name=mongo-sink
topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
{code}
I have 2 separated questions:  
 # how to ignore the message which is non-json format?
 # how to defined a default-key for this kind of message (for example: abc -> 
\{ "non-json": "abc" } )

Thanks

  was:
Hi team,

I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB 
Kafka 
Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]]

However, if I send a non-JSON format message the Connector died. Please see the 
log file for details.

My config file:
{code:java}
name=mongo-sink
topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
{code}
I have 2 separated questions:  
 # how to ignore the message which is non-json format?
 # how to defined a default-key for this kind of message (for example: abc -> 
\{ "non-json": "abc" } )

Thanks


> non-JSON format messages when streaming data from Kafka to Mongo
> 
>
> Key: KAFKA-8664
> URL: https://issues.apache.org/jira/browse/KAFKA-8664
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Vu Le
>Priority: Major
> Attachments: MongoSinkConnector.properties, 
> log_error_when_stream_data_not_a_json_format.txt
>
>
> Hi team,
> I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB 
> Kafka Connector 
> ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md])
> However, if I send a non-JSON format message the Connector died. Please see 
> the log file for details.
> My config file:
> {code:java}
> name=mongo-sink
> topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
> tasks.max=1
> key.ignore=true
> # Specific global MongoDB Sink Connector configuration
> connection.uri=mongodb://localhost:27017
> database=test_kafka
> collection=transaction
> max.num.retries=3
> retries.defer.timeout=5000
> type.name=kafka-connect
> key.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=false
> value.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter.schemas.enable=false
> {code}
> I have 2 separated questions:  
>  # how to ignore the message which is non-json format?
>  # how to defined a default-key for this kind of message (for example: abc -> 
> \{ "non-json": "abc" } )
> Thanks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo

2019-07-15 Thread Vu Le (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vu Le updated KAFKA-8664:
-
Description: 
Hi team,

I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB 
Kafka 
Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]]

However, if I send a non-JSON format message the Connector died. Please see the 
log file for details.

My config file:
{code:java}
name=mongo-sink
topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
{code}
I have 2 separated questions:  
 # how to ignore the message which is non-json format?
 # how to defined a default-key for this kind of message (for example: abc -> 
\{ "non-json": "abc" } )

Thanks

  was:
Hi team,

I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB 
Kafka 
Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]]

However, if I send a non-JSON format message the Connector died. Please see the 
log file for details.

My config file:
{code:java}
name=mongo-sink
topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
{code}
I have 2 separated questions:  
 # how to ignore the message which is non-json format?
 # how to defined a default-key for this kind of message (for example: abc -> 
\{{ { "non-json": "abc" } }}


> non-JSON format messages when streaming data from Kafka to Mongo
> 
>
> Key: KAFKA-8664
> URL: https://issues.apache.org/jira/browse/KAFKA-8664
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Vu Le
>Priority: Major
> Attachments: MongoSinkConnector.properties, 
> log_error_when_stream_data_not_a_json_format.txt
>
>
> Hi team,
> I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB 
> Kafka 
> Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]]
> However, if I send a non-JSON format message the Connector died. Please see 
> the log file for details.
> My config file:
> {code:java}
> name=mongo-sink
> topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
> tasks.max=1
> key.ignore=true
> # Specific global MongoDB Sink Connector configuration
> connection.uri=mongodb://localhost:27017
> database=test_kafka
> collection=transaction
> max.num.retries=3
> retries.defer.timeout=5000
> type.name=kafka-connect
> key.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=false
> value.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter.schemas.enable=false
> {code}
> I have 2 separated questions:  
>  # how to ignore the message which is non-json format?
>  # how to defined a default-key for this kind of message (for example: abc -> 
> \{ "non-json": "abc" } )
> Thanks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo

2019-07-15 Thread Vu Le (JIRA)
Vu Le created KAFKA-8664:


 Summary: non-JSON format messages when streaming data from Kafka 
to Mongo
 Key: KAFKA-8664
 URL: https://issues.apache.org/jira/browse/KAFKA-8664
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.1.1
Reporter: Vu Le
 Attachments: MongoSinkConnector.properties, 
log_error_when_stream_data_not_a_json_format.txt

Hi team,

I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB 
Kafka 
Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]]

However, if I send a non-JSON format message the Connector died. Please see the 
log file for details.

My config file:
{code:java}
name=mongo-sink
topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
{code}
I have 2 separated questions:  
 # how to ignore the message which is non-json format?
 # how to defined a default-key for this kind of message (for example: abc -> 
\{{ { "non-json": "abc" } }}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)