[jira] [Assigned] (KAFKA-6498) Add RocksDB statistics via Streams metrics

2018-01-31 Thread james chien (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

james chien reassigned KAFKA-6498:
--

Assignee: james chien

> Add RocksDB statistics via Streams metrics
> --
>
> Key: KAFKA-6498
> URL: https://issues.apache.org/jira/browse/KAFKA-6498
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Reporter: Guozhang Wang
>Assignee: james chien
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's own stats can be programmatically exposed via 
> {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many 
> useful settings already. However these stats are not exposed directly via 
> Streams today and hence for any users who wants to get access to them they 
> have to manually interact with the underlying RocksDB directly, not through 
> Streams.
> We should expose such stats via Streams metrics programmatically for users to 
> investigate them without trying to access the rocksDB directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6498) Add RocksDB statistics via Streams metrics

2018-01-31 Thread james chien (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347963#comment-16347963
 ] 

james chien commented on KAFKA-6498:


[~guozhang] I want to pick it up!

As my comprehension is that we want to expose RockDB stats via Streams metrics 
so that user can use Streams metrics simply to monitor RockDB stats but not to 
access RockDB directly.

> Add RocksDB statistics via Streams metrics
> --
>
> Key: KAFKA-6498
> URL: https://issues.apache.org/jira/browse/KAFKA-6498
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's own stats can be programmatically exposed via 
> {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many 
> useful settings already. However these stats are not exposed directly via 
> Streams today and hence for any users who wants to get access to them they 
> have to manually interact with the underlying RocksDB directly, not through 
> Streams.
> We should expose such stats via Streams metrics programmatically for users to 
> investigate them without trying to access the rocksDB directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5876) IQ should throw different exceptions for different errors

2018-01-31 Thread Vito Jeng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347957#comment-16347957
 ] 

Vito Jeng commented on KAFKA-5876:
--

[~Pegerto] Thanks!

> IQ should throw different exceptions for different errors
> -
>
> Key: KAFKA-5876
> URL: https://issues.apache.org/jira/browse/KAFKA-5876
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Vito Jeng
>Priority: Major
>  Labels: needs-kip, newbie++
> Fix For: 1.2.0
>
>
> Currently, IQ does only throws {{InvalidStateStoreException}} for all errors 
> that occur. However, we have different types of errors and should throw 
> different exceptions for those types.
> For example, if a store was migrated it must be rediscovered while if a store 
> cannot be queried yet, because it is still re-created after a rebalance, the 
> user just needs to wait until store recreation is finished.
> There might be other examples, too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2018-01-31 Thread Soby Chacko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Soby Chacko updated KAFKA-6502:
---
Description: 
See this StackOverflow issue: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
works fine when an error occurs by successfully logging and continuing. 
However, on a continuous stream of errors, it seems like these messages are not 
committed and on a restart of the application they reappear again.  It is more 
problematic if I try to send the messages in error to a DLQ. On a restart, they 
are sent again to DLQ. As soon as I have a good record coming in, it looks like 
the offset moves further and not seeing the already logged messages again after 
a restart. 

I reproduced this behavior by running the sample provided here: 
[https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} 
to force a deserialization error on input and reduced the commit interval to 
just 1 second. Also added the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);}}.

 

It looks like when deserialization exceptions occur, this flag is never set to 
be true here: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
 It only becomes true once processing succeeds. That might be the reason why 
commit is not happening even after I manually call processorContext#commit().

  was:
See this StackOverflow issue: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 

I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
works fine when an error occurs by successfully logging and continuing. 
However, on a continuous stream of errors, it seems like these messages are not 
committed and on a restart of the application they reappear again.  It is more 
problematic if I try to send the messages in error to a DLQ. On a restart, they 
are sent again to DLQ. As soon as I have a good record coming in, it looks like 
the offset moves further and not seeing the already logged messages again after 
a restart. 

I reproduced this behavior by running the sample provided here: 
[https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} 
to force a deserialization error on input and reduced the commit interval to 
just 1 second. Also added the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);}}.

 

It looks like when deserialization exceptions occur, this flag is never set to 
be true here: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
 It only becomes true once processing succeeds. That might be the reason why 
commit is not happening even after I manually call processorContext#commit().


> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages 

[jira] [Updated] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2018-01-31 Thread Soby Chacko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Soby Chacko updated KAFKA-6502:
---
Description: 
See this StackOverflow issue: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 

I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
works fine when an error occurs by successfully logging and continuing. 
However, on a continuous stream of errors, it seems like these messages are not 
committed and on a restart of the application they reappear again.  It is more 
problematic if I try to send the messages in error to a DLQ. On a restart, they 
are sent again to DLQ. As soon as I have a good record coming in, it looks like 
the offset moves further and not seeing the already logged messages again after 
a restart. 

I reproduced this behavior by running the sample provided here: 
[https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} 
to force a deserialization error on input and reduced the commit interval to 
just 1 second. Also added the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);}}.

 

It looks like when deserialization exceptions occur, this flag is never set to 
be true here: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
 It only becomes true once processing succeeds. That might be the reason why 
commit is not happening even after I manually call processorContext#commit().

  was:
See this StackOverflow issue: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: 
https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899


> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  
> I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  LogAndContinueExceptionHandler.class);}}.
>  
> It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2018-01-31 Thread Soby Chacko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Soby Chacko updated KAFKA-6502:
---
Description: 
See this StackOverflow issue: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
works fine when an error occurs by successfully logging and continuing. 
However, on a continuous stream of errors, it seems like these messages are not 
committed and on a restart of the application they reappear again.  It is more 
problematic if I try to send the messages in error to a DLQ. On a restart, they 
are sent again to DLQ. As soon as I have a good record coming in, it looks like 
the offset moves further and not seeing the already logged messages again after 
a restart. 

I reproduced this behavior by running the sample provided here: 
[https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} 
to force a deserialization error on input and reduced the commit interval to 
just 1 second. Also added the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);}}.

 It looks like when deserialization exceptions occur, this flag is never set to 
be true here: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
 It only becomes true once processing succeeds. That might be the reason why 
commit is not happening even after I manually call processorContext#commit().

  was:
See this StackOverflow issue: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]

and this comment: 
[https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]

 I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
works fine when an error occurs by successfully logging and continuing. 
However, on a continuous stream of errors, it seems like these messages are not 
committed and on a restart of the application they reappear again.  It is more 
problematic if I try to send the messages in error to a DLQ. On a restart, they 
are sent again to DLQ. As soon as I have a good record coming in, it looks like 
the offset moves further and not seeing the already logged messages again after 
a restart. 

I reproduced this behavior by running the sample provided here: 
[https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]

I changed the incoming value Serde to {{Serdes.Integer().getClass().getName()}} 
to force a deserialization error on input and reduced the commit interval to 
just 1 second. Also added the following to the config.

{{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);}}.

 

It looks like when deserialization exceptions occur, this flag is never set to 
be true here: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
 It only becomes true once processing succeeds. That might be the reason why 
commit is not happening even after I manually call processorContext#commit().


> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages 

[jira] [Created] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-01-31 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-6514:
-

 Summary: Add API version as a tag for the RequestsPerSec metric
 Key: KAFKA-6514
 URL: https://issues.apache.org/jira/browse/KAFKA-6514
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 1.0.0
Reporter: Allen Wang


After we upgrade broker to a new version, one important insight is to see how 
many clients have been upgraded so that we can switch the message format when 
most of the clients have also been updated to the new version to minimize the 
performance penalty. 

RequestsPerSec with the version tag will give us that insight.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5802) ScramServerCallbackHandler#handle should check username not being null before calling credentialCache.get()

2018-01-31 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302643#comment-16302643
 ] 

Ted Yu edited comment on KAFKA-5802 at 2/1/18 1:24 AM:
---

+1


was (Author: yuzhih...@gmail.com):
lgtm

> ScramServerCallbackHandler#handle should check username not being null before 
> calling credentialCache.get()
> ---
>
> Key: KAFKA-5802
> URL: https://issues.apache.org/jira/browse/KAFKA-5802
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> String username = null;
> for (Callback callback : callbacks) {
> if (callback instanceof NameCallback)
> username = ((NameCallback) callback).getDefaultName();
> else if (callback instanceof ScramCredentialCallback)
> ((ScramCredentialCallback) 
> callback).scramCredential(credentialCache.get(username));
> {code}
> Since ConcurrentHashMap, used by CredentialCache, doesn't allow null keys, we 
> should check that username is not null before calling credentialCache.get()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6494) Extend ConfigCommand to update broker config using new AdminClient

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347856#comment-16347856
 ] 

ASF GitHub Bot commented on KAFKA-6494:
---

rajinisivaram opened a new pull request #4503: KAFKA-6494; ConfigCommand update 
to use AdminClient for broker configs
URL: https://github.com/apache/kafka/pull/4503
 
 
   Use new AdminClient for describing and altering broker configs using 
ConfigCommand. Broker quota configs as well as other configs will continue to 
be processed directly using ZooKeeper until KIP-248 is implemented.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend ConfigCommand to update broker config using new AdminClient
> --
>
> Key: KAFKA-6494
> URL: https://issues.apache.org/jira/browse/KAFKA-6494
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> Add --bootstrap-server and --command-config options for new AdminClient. 
> Update ConfigCommand to use new AdminClient for dynamic broker config updates 
> in KIP-226. Full conversion of ConfigCommand to new AdminClient will be done 
> later under KIP-248.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347850#comment-16347850
 ] 

ASF GitHub Bot commented on KAFKA-3625:
---

mjsax opened a new pull request #4502: KAFKA-3625: TopologyTestDriver must 
process output for wall-clock-time punctuations and on close()
URL: https://github.com/apache/kafka/pull/4502
 
 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: kip, user-experience
> Fix For: 1.1.0
>
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2018-01-31 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222620#comment-16222620
 ] 

Ted Yu edited comment on KAFKA-6024 at 2/1/18 1:16 AM:
---

lgtm


was (Author: yuzhih...@gmail.com):
+1

> Consider moving validation in KafkaConsumer ahead of call to 
> acquireAndEnsureOpen()
> ---
>
> Key: KAFKA-6024
> URL: https://issues.apache.org/jira/browse/KAFKA-6024
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In several methods, parameter validation is done after calling 
> acquireAndEnsureOpen() :
> {code}
> public void seek(TopicPartition partition, long offset) {
> acquireAndEnsureOpen();
> try {
> if (offset < 0)
> throw new IllegalArgumentException("seek offset must not be a 
> negative number");
> {code}
> Since the value of parameter would not change per invocation, it seems 
> performing validation ahead of acquireAndEnsureOpen() call would be better.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6513) New Connect header support doesn't define `converter.type` property correctly

2018-01-31 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6513:


 Summary: New Connect header support doesn't define 
`converter.type` property correctly
 Key: KAFKA-6513
 URL: https://issues.apache.org/jira/browse/KAFKA-6513
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 1.1.0


The recent feature (KAFKA-5142) added a new {{converter.type}} to make the 
{{Converter}} implementations now implement {{Configurable}}. However, the 
worker is not correctly setting these new property types and is instead 
incorrectly assuming the existing {{Converter}} implementations will set them. 
For example:

{noformat}
Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
Missing required configuration "converter.type" which has no default value.
at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:472)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:462)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
at 
org.apache.kafka.connect.storage.ConverterConfig.(ConverterConfig.java:48)
at 
org.apache.kafka.connect.json.JsonConverterConfig.(JsonConverterConfig.java:59)
at 
org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:284)
at 
org.apache.kafka.connect.runtime.isolation.Plugins.newConfiguredPlugin(Plugins.java:77)
at 
org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:208)
at org.apache.kafka.connect.runtime.Worker.(Worker.java:107)
at 
io.confluent.connect.replicator.ReplicatorApp.config(ReplicatorApp.java:104)
at 
io.confluent.connect.replicator.ReplicatorApp.main(ReplicatorApp.java:60)
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347816#comment-16347816
 ] 

Jason Gustafson commented on KAFKA-6492:


[~becket_qin] Beat you to it. Let me know if the PR I submitted makes sense.

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-01-31 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6512:
---
Fix Version/s: 1.1.0

> Java Producer: Excessive memory usage with compression enabled
> --
>
> Key: KAFKA-6512
> URL: https://issues.apache.org/jira/browse/KAFKA-6512
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Windows 10
>Reporter: Kyle Tinker
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: KafkaSender.java
>
>
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the 
> Kafka client so that I can ensure that my system is sized appropriately and 
> will be stable even under heavy usage.
> As a user of the Java producer, I want a smaller memory footprint so that my 
> systems don't consume as many resources.
> h2. Acceptance Criteria
>  * Enabling Compression in Kafka should not significantly increase the memory 
> usage of Kafka
>  * The memory usage of Kafka's Java Producer should be roughly in line with 
> the buffer size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression 
> (gzip or lz4).  I don't observe the behavior with compression off, but with 
> it on I'll run out of heap (2GB).  Using a Java profiler, I see the data is 
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
> is not successful.  I'm most likely network bottlenecked, so I expect the 
> producer buffers to be full while the job is running and potentially a lot of 
> unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20 
> threads) and sending data as quickly as I can.  I've also tried 1MB of 
> buffer.memory, which seemed to reduce memory consumption but I could still 
> run OOM in certain cases.  I have max.in.flight.requests.per.connection set 
> to 1.  In short, I should only have ~20 MB (20* 1MB) of data in buffers, but 
> I can easily exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
> doesn't clear the compressedBuffer or buffer when close() is called.  In my 
> heap dump, both of those are ~65k size each, meaning that each batch is 
> taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 
> and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
> resources required for record appends (e.g. compression buffers)".  However, 
> this method doesn't actually clear those buffers because 
> KafkaLZ4BlockOutputStream.close() only writes the block and end mark and 
> closes the output stream.  It doesn't actually clear the buffer and 
> compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM 
> until the block is acknowledged by the broker, processed in 
> Sender:handleProduceResponse(), and the batch is deallocated.  This memory 
> usage therefore increases, possibly without bound.  In my test program, the 
> program died with approximately 345 unprocessed batches per producer (20 
> producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
>  # Create a topic test with plenty of storage
>  # Use a connection with a very fast upload pipe and limited download.  This 
> allows the outbound data to go out, but acknowledgements to be delayed 
> flowing in.
>  # Download KafkaSender.java (attached to this ticket)
>  # Set line 17 to reference your Kafka broker
>  # Run the program with a 1GB Xmx value
> h2. Possible solutions
> There are a few possible optimizations I can think of:
>  # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
> non-final and null them in the close() method
>  # We could declare the MemoryRecordsBuilder.appendStream non-final and null 
> it in the closeForRecordAppends() method
>  # We could have the ProducerBatch discard the recordsBuilder in 
> closeForRecordAppends(), however, this is likely a bad idea because the 
> recordsBuilder contains significant metadata that is likely needed after the 
> stream is closed.  It is also final.
>  # We could try to limit the number of non-acknowledged batches in flight.  
> This would bound the maximum memory usage but may negatively impact 
> performance.
>  
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 
> proposed here, it seems the best.  This would also involve having to 

[jira] [Created] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-01-31 Thread Kyle Tinker (JIRA)
Kyle Tinker created KAFKA-6512:
--

 Summary: Java Producer: Excessive memory usage with compression 
enabled
 Key: KAFKA-6512
 URL: https://issues.apache.org/jira/browse/KAFKA-6512
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
 Environment: Windows 10
Reporter: Kyle Tinker
 Attachments: KafkaSender.java

h2. User Story

As a user of the Java producer, I want a predictable memory usage for the Kafka 
client so that I can ensure that my system is sized appropriately and will be 
stable even under heavy usage.

As a user of the Java producer, I want a smaller memory footprint so that my 
systems don't consume as many resources.
h2. Acceptance Criteria
 * Enabling Compression in Kafka should not significantly increase the memory 
usage of Kafka
 * The memory usage of Kafka's Java Producer should be roughly in line with the 
buffer size (buffer.memory) and the number of producers declared.

h2. Additional Information

I've observed high memory usage in the producer when enabling compression (gzip 
or lz4).  I don't observe the behavior with compression off, but with it on 
I'll run out of heap (2GB).  Using a Java profiler, I see the data is in the 
KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
is not successful.  I'm most likely network bottlenecked, so I expect the 
producer buffers to be full while the job is running and potentially a lot of 
unacknowledged records.

I've tried using the default buffer.memory with 20 producers (across 20 
threads) and sending data as quickly as I can.  I've also tried 1MB of 
buffer.memory, which seemed to reduce memory consumption but I could still run 
OOM in certain cases.  I have max.in.flight.requests.per.connection set to 1.  
In short, I should only have ~20 MB (20* 1MB) of data in buffers, but I can 
easily exhaust 2000 MB used by Kafka.

In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
doesn't clear the compressedBuffer or buffer when close() is called.  In my 
heap dump, both of those are ~65k size each, meaning that each batch is taking 
up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 and 
messages are 1k each until the batch fills).

Kafka tries to manage memory usage by calling 
MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
resources required for record appends (e.g. compression buffers)".  However, 
this method doesn't actually clear those buffers because 
KafkaLZ4BlockOutputStream.close() only writes the block and end mark and closes 
the output stream.  It doesn't actually clear the buffer and compressedBuffer 
in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM until the block is 
acknowledged by the broker, processed in Sender:handleProduceResponse(), and 
the batch is deallocated.  This memory usage therefore increases, possibly 
without bound.  In my test program, the program died with approximately 345 
unprocessed batches per producer (20 producers), despite having 
max.in.flight.requests.per.connection=1.
h2. Steps to Reproduce
 # Create a topic test with plenty of storage
 # Use a connection with a very fast upload pipe and limited download.  This 
allows the outbound data to go out, but acknowledgements to be delayed flowing 
in.
 # Download KafkaSender.java (attached to this ticket)
 # Set line 17 to reference your Kafka broker
 # Run the program with a 1GB Xmx value

h2. Possible solutions

There are a few possible optimizations I can think of:
 # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
non-final and null them in the close() method
 # We could declare the MemoryRecordsBuilder.appendStream non-final and null it 
in the closeForRecordAppends() method
 # We could have the ProducerBatch discard the recordsBuilder in 
closeForRecordAppends(), however, this is likely a bad idea because the 
recordsBuilder contains significant metadata that is likely needed after the 
stream is closed.  It is also final.
 # We could try to limit the number of non-acknowledged batches in flight.  
This would bound the maximum memory usage but may negatively impact performance.

 

Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.

Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 
proposed here, it seems the best.  This would also involve having to check 
appendStreamIsClosed in every usage of appendStream within MemoryRecordsBuilder 
to avoid NPE's.

Fix #4 is likely necessary if we want to bound the maximum memory usage of 
Kafka.  Removing the buffers in Fix 1 or 2 will reduce the memory usage by 
~90%, but theoretically there is still no limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-01-31 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}

In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6138) Simplify StreamsBuilder#addGlobalStore

2018-01-31 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6138:
---
Fix Version/s: 1.1.0

> Simplify StreamsBuilder#addGlobalStore
> --
>
> Key: KAFKA-6138
> URL: https://issues.apache.org/jira/browse/KAFKA-6138
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Panuwat Anawatmongkhon
>Priority: Major
>  Labels: beginner, kip, newbie
> Fix For: 1.1.0
>
>
> {{StreamsBuilder#addGlobalStore}} is conceptually a 1:1 copy of 
> {{Topology#addGlobalStore}}, that would follow DSL design principles though. 
> Atm, {{StreamsBuilder#addGlobalStore}} does not follow provide a good user 
> experience as it forces users to specify names for processor names – 
> processor name are a Processor API detail should be hidden in the DSL. The 
> current API is the following:
> {noformat}
> public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
> storeBuilder,
>   final String topic,
>   final String sourceName,
>   final Consumed consumed,
>   final String 
> processorName,
>   final ProcessorSupplier 
> stateUpdateSupplier)
> {noformat}
> We should remove the two parameters {{sourceName}} and {{processorName}}. To 
> be backward compatible, the current method must be deprecated and a new 
> method should be added with reduced number of parameters. 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%3A+Simplify+StreamsBuilder%23addGlobalStore



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6138) Simplify StreamsBuilder#addGlobalStore

2018-01-31 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6138.

Resolution: Fixed

> Simplify StreamsBuilder#addGlobalStore
> --
>
> Key: KAFKA-6138
> URL: https://issues.apache.org/jira/browse/KAFKA-6138
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Panuwat Anawatmongkhon
>Priority: Major
>  Labels: beginner, kip, newbie
> Fix For: 1.1.0
>
>
> {{StreamsBuilder#addGlobalStore}} is conceptually a 1:1 copy of 
> {{Topology#addGlobalStore}}, that would follow DSL design principles though. 
> Atm, {{StreamsBuilder#addGlobalStore}} does not follow provide a good user 
> experience as it forces users to specify names for processor names – 
> processor name are a Processor API detail should be hidden in the DSL. The 
> current API is the following:
> {noformat}
> public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
> storeBuilder,
>   final String topic,
>   final String sourceName,
>   final Consumed consumed,
>   final String 
> processorName,
>   final ProcessorSupplier 
> stateUpdateSupplier)
> {noformat}
> We should remove the two parameters {{sourceName}} and {{processorName}}. To 
> be backward compatible, the current method must be deprecated and a new 
> method should be added with reduced number of parameters. 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%3A+Simplify+StreamsBuilder%23addGlobalStore



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6138) Simplify StreamsBuilder#addGlobalStore

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347672#comment-16347672
 ] 

ASF GitHub Bot commented on KAFKA-6138:
---

mjsax closed pull request #4430: KAFKA-6138 Simplify 
StreamsBuilder#addGlobalStore
URL: https://github.com/apache/kafka/pull/4430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a94b0a74622..6551deeaed2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -453,6 +453,28 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder) {
 return this;
 }
 
+/**
+ * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, 
ProcessorSupplier)} instead
+ */
+@SuppressWarnings("unchecked")
+@Deprecated
+public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
storeBuilder,
+  final String topic,
+  final String sourceName,
+  final Consumed consumed,
+  final String 
processorName,
+  final ProcessorSupplier 
stateUpdateSupplier) {
+Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+Objects.requireNonNull(consumed, "consumed can't be null");
+internalStreamsBuilder.addGlobalStore(storeBuilder,
+  sourceName,
+  topic,
+  new ConsumedInternal<>(consumed),
+  processorName,
+  stateUpdateSupplier);
+return this;
+}
+
 /**
  * Adds a global {@link StateStore} to the topology.
  * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
@@ -467,10 +489,8 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder) {
  * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
  *
  * @param storeBuilder  user defined {@link StoreBuilder}; can't 
be {@code null}
- * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
  * @param topic the topic to source the data from
  * @param consumed  the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
- * @param processorName the name of the {@link ProcessorSupplier}
  * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
  * @return itself
  * @throws TopologyException if the processor of state is already 
registered
@@ -478,18 +498,14 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder) {
 @SuppressWarnings("unchecked")
 public synchronized StreamsBuilder addGlobalStore(final StoreBuilder 
storeBuilder,
   final String topic,
-  final String sourceName,
   final Consumed consumed,
-  final String 
processorName,
   final ProcessorSupplier 
stateUpdateSupplier) {
 Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
 Objects.requireNonNull(consumed, "consumed can't be null");
 internalStreamsBuilder.addGlobalStore(storeBuilder,
-  sourceName,
-  topic,
-  new ConsumedInternal<>(consumed),
-  processorName,
-  stateUpdateSupplier);
+topic,
+new ConsumedInternal<>(consumed),
+stateUpdateSupplier);
 return this;
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d0c50..28787a5f865 100644
--- 

[jira] [Updated] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-6492:

Affects Version/s: 0.10.0.2
   0.10.1.1
   0.10.2.1
   0.11.0.2

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347670#comment-16347670
 ] 

Jiangjie Qin commented on KAFKA-6492:
-

[~hachikuji] You are right. This issue will affect earlier versions as well. 
I'll submit a PR.

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347612#comment-16347612
 ] 

ASF GitHub Bot commented on KAFKA-6492:
---

hachikuji opened a new pull request #4498: KAFKA-6492: Fix log truncation to 
empty segment
URL: https://github.com/apache/kafka/pull/4498
 
 
   This patch ensures that truncation to an empty segment forces resizing of 
the index file in order to prevent premature rolling.
   
   I have added unit tests which verify that appends are permitted following 
truncation to an empty segment. Without the fix, this test case reproduces the 
failure in which the rolled segment matches the current active segment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6511) Connect header parser incorrectly parses arrays

2018-01-31 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6511:
-
Fix Version/s: 1.1.0

> Connect header parser incorrectly parses arrays
> ---
>
> Key: KAFKA-6511
> URL: https://issues.apache.org/jira/browse/KAFKA-6511
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Arjun Satish
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 1.1.0
>
>
> An incorrect input like "[1, 2, 3,,,]" is misinterpreted by the Values 
> parser. An example test can be found here: 
> https://github.com/apache/kafka/pull/4319#discussion_r165155768



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6511) Connect header parser incorrectly parses arrays

2018-01-31 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6511:
-
Priority: Blocker  (was: Critical)

> Connect header parser incorrectly parses arrays
> ---
>
> Key: KAFKA-6511
> URL: https://issues.apache.org/jira/browse/KAFKA-6511
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Arjun Satish
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 1.1.0
>
>
> An incorrect input like "[1, 2, 3,,,]" is misinterpreted by the Values 
> parser. An example test can be found here: 
> https://github.com/apache/kafka/pull/4319#discussion_r165155768



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-01-31 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347502#comment-16347502
 ] 

Guozhang Wang commented on KAFKA-5882:
--

Are the exception error message during crashes exactly the same in 1.0.0 and 
0.11.0.2? Could you paste both of them here?

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6511) Connect header parser incorrectly parses arrays

2018-01-31 Thread Arjun Satish (JIRA)
Arjun Satish created KAFKA-6511:
---

 Summary: Connect header parser incorrectly parses arrays
 Key: KAFKA-6511
 URL: https://issues.apache.org/jira/browse/KAFKA-6511
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Arjun Satish
Assignee: Randall Hauch


An incorrect input like "[1, 2, 3,,,]" is misinterpreted by the Values parser. 
An example test can be found here: 
https://github.com/apache/kafka/pull/4319#discussion_r165155768



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2334) Prevent HW from going back during leader failover

2018-01-31 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347426#comment-16347426
 ] 

Colin P. McCabe commented on KAFKA-2334:


[~guozhang]: The fix here is not KIP-101, but KIP-207.  KIP-207 was accepted, 
but not implemented yet.  There's some discussion here: 
https://www.mail-archive.com/dev@kafka.apache.org/msg81074.html

> Prevent HW from going back during leader failover 
> --
>
> Key: KAFKA-2334
> URL: https://issues.apache.org/jira/browse/KAFKA-2334
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: reliability
>
> Consider the following scenario:
> 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as 
> the follower. 
> 1. A producer keep sending to Kafka with ack=-1.
> 2. A consumer repeat issuing ListOffset request to Kafka.
> And the following sequence:
> 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2.
> 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO 
> becomes 100) and hold the request in purgatory.
> 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and 
> returns the 100 messages.
> 3. B2 append its received message to local log (LEO becomes 100).
> 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing 
> that B2's LEO has caught up to 100, and hence update its own HW, and 
> satisfying the ProduceRequest in purgatory, and sending the FetchResponse 
> with HW 100 back to B2 ASYNCHRONOUSLY.
> 5. B1 successfully sends the ProduceResponse to the producer, and then fails, 
> hence the FetchResponse did not reach B2, whose HW remains 0.
> From the consumer's point of view, it could first see the latest offset of 
> 100 (from B1), and then see the latest offset of 0 (from B2), and then the 
> latest offset gradually catch up to 100.
> This is because we use HW to guard the ListOffset and 
> Fetch-from-ordinary-consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347394#comment-16347394
 ] 

Jason Gustafson commented on KAFKA-6492:


[~becket_qin] Does this only affect 1.0.0? As far as I can tell, it can happen 
on older versions as well. I have seen recently an instance of the failure 
during log rolling on 0.10.1. It was a compacted topic, but I cannot confirm 
whether it is the specific truncation scenario you describe above (I don't have 
all the historic logs).

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5142) KIP-145 - Expose Record Headers in Kafka Connect

2018-01-31 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-5142.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4319
[https://github.com/apache/kafka/pull/4319]

> KIP-145 - Expose Record Headers in Kafka Connect
> 
>
> Key: KAFKA-5142
> URL: https://issues.apache.org/jira/browse/KAFKA-5142
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Michael Andre Pearce
>Assignee: Michael Andre Pearce
>Priority: Major
> Fix For: 1.1.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect
> As KIP-82 introduced Headers into the core Kafka Product, it would be 
> advantageous to expose them in the Kafka Connect Framework.
> Connectors that replicate data between Kafka cluster or between other 
> messaging products and Kafka would want to replicate the headers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2404) Delete config znode when config values are empty

2018-01-31 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347310#comment-16347310
 ] 

Manikumar commented on KAFKA-2404:
--

looks like this is tricky to implement. We can only remove a config path after 
the empty config list read by all the brokers.
probably we may not require this.  maybe we can close this issue.

[~junrao],  [~rsivaram] Any thoughts?

> Delete config znode when config values are empty
> 
>
> Key: KAFKA-2404
> URL: https://issues.apache.org/jira/browse/KAFKA-2404
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>Priority: Major
>
> Jun's comment from KAFKA-2205:
> "Currently, if I add client config and then remove it, the clientid still 
> shows up during describe, but with empty config values. We probably should 
> delete the path when there is no overwritten values. Could you do that in a 
> follow up patch?
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type client 
> --describe 
> Configs for client:client1 are"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6503) Connect: Plugin scan is very slow

2018-01-31 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347234#comment-16347234
 ] 

Randall Hauch commented on KAFKA-6503:
--

Specifically, we're not using an executor when scanning:

{code:java}
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
Reflections reflections = new Reflections(builder);
{code}

When no executor is used, then scanning is done in a single thread. However, we 
are supplying more than a few URLs, so we should be able to parallelize this by 
providing an executor. Reflections actually has a method that will create an 
executor sized with the number of processors:
{code:java}
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
builder.useParallelExecutor();
Reflections reflections = new Reflections(builder);
{code}

The challenge is that on Docker with older JRE 8 releases, the JVM won't report 
the correct number of processors. We might just have to deal with that, though.


> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Critical
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,611] INFO Added plugin 
> 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,651] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-jdbc 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
> 

[jira] [Comment Edited] (KAFKA-6503) Connect: Plugin scan is very slow

2018-01-31 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347021#comment-16347021
 ] 

Randall Hauch edited comment on KAFKA-6503 at 1/31/18 5:15 PM:
---

Very similar to KAFKA-6208, but the latter actually proposes using the 
ServiceLoader mechanism that requires connector, transform, and converter 
implementations to supply an additional file in their JARs. That will take a 
number of releases to do, so this should be our long term directly.

In the meantime, we should try to speed up or optimize scanning for a short 
term win.


was (Author: rhauch):
Very similar to KAFKA-6208, but the latter actually proposes using the 
ServiceLoader mechanism that requires connector, transform, and converter 
implementations to supply an additional file in their JARs. That will take a 
number of releases to do, so this should be our long term directly.

In the meantime, we should be able to speed up the scanning by either doing it 
in parallel or by scanning once for a set of interfaces (rather than one scan 
per interface). This is a short term win.

> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Critical
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,611] INFO Added plugin 
> 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,651] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-jdbc 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Added plugin 
> 

[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347199#comment-16347199
 ] 

Matthias J. Sax commented on KAFKA-6487:


@bdevylde I added you to the list of contributors. You can assign tickets to 
yourself now.

Please, assign tickets to yourself before you start working on it. Also update 
the ticket to "work in progress" and "patch available" if possible. If you are 
interested in a ticket that is assigned but does not show any activity, feel 
free to leave a comment and ask if you can take if over. Thanks for 
contributing!

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bart De Vylder
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6487:
--

Assignee: Bart De Vylder

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bart De Vylder
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6510) WARN: Failed to send SSL Close message java.io.IOException: Unexpected status returned by SSLEngine.wrap, expected CLOSED, received OK.

2018-01-31 Thread John Chu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Chu updated KAFKA-6510:

Summary: WARN: Failed to send SSL Close message java.io.IOException: 
Unexpected status returned by SSLEngine.wrap, expected CLOSED, received OK.   
(was: WARN: Fail to send SSL Close message)

> WARN: Failed to send SSL Close message java.io.IOException: Unexpected status 
> returned by SSLEngine.wrap, expected CLOSED, received OK. 
> 
>
> Key: KAFKA-6510
> URL: https://issues.apache.org/jira/browse/KAFKA-6510
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: John Chu
>Priority: Major
>
> I have a thread which once in a while is going to list the topics on the 
> Message Hub. But once in a while, I am getting a :Failed to send SSL Close 
> message.
> Any ideas?
> Noticed there is another similar defect open: 
> https://issues.apache.org/jira/browse/KAFKA-3702
> Not sure if this is the same as that issue. .
> Also another post from grokbase: 
> [http://grokbase.com/t/kafka/users/1653bm0a02/warn-failed-to-send-ssl-close-message]
>  
> {code:java}
> KafkaConsumer consumer = new 
> KafkaConsumer<>(getConsumerConfiguration());
> try{
>Map topics = consumer.listTopics();
>return new ArrayList(topics.keySet());
> }
> finally{
> if (consumer != null){
>   consumer.close();
> }
> }
> {code}
> I am getting the warning from *consumer.close*.
> The configuration of the consumer:
>  * sasl.mechanism = PLAIN
>  * security.protocol = SASL_SSL
>  * group.id = consumer1
>  * ssl.enabled.protocol = TLSv1.2
>  * ssl.endpoint.identification.algorithm = HTTPS
>  * ssl.protocol = TLSv1.2
>  * sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule 
> required username="USERNAME" password="PASSWORD";
> {quote}[WARN ] 2018-01-25 20:12:23.204 [ClusterChannelMonitorTaskThread] 
> org.apache.kafka.common.network.SslTransportLayer {} - Failed to send SSL 
> Close message java.io.IOException: Unexpected status returned by 
> SSLEngine.wrap, expected CLOSED, received OK. Will not send close message to 
> peer. at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:158)
>  [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:582) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.common.network.Selector.close(Selector.java:573) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.common.network.Selector.close(Selector.java:539) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.common.network.Selector.close(Selector.java:250) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:505) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:439)
>  [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71) 
> [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1613)
>  [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573)
>  [kafka-clients-0.11.0.0.jar:?] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549)
>  [kafka-clients-0.11.0.0.jar:?] at 
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6510) WARN: Fail to send SSL Close message

2018-01-31 Thread John Chu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Chu updated KAFKA-6510:

Description: 
I have a thread which once in a while is going to list the topics on the 
Message Hub. But once in a while, I am getting a :Failed to send SSL Close 
message.

Any ideas?

Noticed there is another similar defect open: 
https://issues.apache.org/jira/browse/KAFKA-3702

Not sure if this is the same as that issue. .

Also another post from grokbase: 
[http://grokbase.com/t/kafka/users/1653bm0a02/warn-failed-to-send-ssl-close-message]

 
{code:java}
KafkaConsumer consumer = new 
KafkaConsumer<>(getConsumerConfiguration());

try{
   Map topics = consumer.listTopics();
   return new ArrayList(topics.keySet());
}
finally{
if (consumer != null){
  consumer.close();
}
}
{code}
I am getting the warning from *consumer.close*.

The configuration of the consumer:
 * sasl.mechanism = PLAIN
 * security.protocol = SASL_SSL
 * group.id = consumer1
 * ssl.enabled.protocol = TLSv1.2
 * ssl.endpoint.identification.algorithm = HTTPS
 * ssl.protocol = TLSv1.2
 * sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule 
required username="USERNAME" password="PASSWORD";

{quote}[WARN ] 2018-01-25 20:12:23.204 [ClusterChannelMonitorTaskThread] 
org.apache.kafka.common.network.SslTransportLayer {} - Failed to send SSL Close 
message java.io.IOException: Unexpected status returned by SSLEngine.wrap, 
expected CLOSED, received OK. Will not send close message to peer. at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:158)
 [kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.doClose(Selector.java:582) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:573) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:539) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:250) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:505) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:439)
 [kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1613) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549) 
[kafka-clients-0.11.0.0.jar:?] at 
{quote}

  was:
I have a thread which once in a while is going to list the topics on the 
Message Hub. But once in a while, I am getting a :Failed to send SSL Close 
message.

Any ideas?

Noticed there is another similar defect open: 
https://issues.apache.org/jira/browse/KAFKA-3702

Not sure if they are the same.
{code:java}
KafkaConsumer consumer = new 
KafkaConsumer<>(getConsumerConfiguration());

try{
   Map topics = consumer.listTopics();
   return new ArrayList(topics.keySet());
}
finally{
if (consumer != null){
  consumer.close();
}
}
{code}
I am getting the warning from *consumer.close*.

The configuration of the consumer:
 * sasl.mechanism = PLAIN
 * security.protocol = SASL_SSL
 * group.id = consumer1
 * ssl.enabled.protocol = TLSv1.2
 * ssl.endpoint.identification.algorithm = HTTPS
 * ssl.protocol = TLSv1.2
 * sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule 
required username="USERNAME" password="PASSWORD";

{quote}[WARN ] 2018-01-25 20:12:23.204 [ClusterChannelMonitorTaskThread] 
org.apache.kafka.common.network.SslTransportLayer {} - Failed to send SSL Close 
message java.io.IOException: Unexpected status returned by SSLEngine.wrap, 
expected CLOSED, received OK. Will not send close message to peer. at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:158)
 [kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.doClose(Selector.java:582) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:573) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:539) 

[jira] [Comment Edited] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347165#comment-16347165
 ] 

Bill Bejeck edited comment on KAFKA-6487 at 1/31/18 4:55 PM:
-

No worries [~bdevylde] I'll re-assign the ticket to you


was (Author: bbejeck):
No worries [~bdevylde] I'll re-assign the ticket to you

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-6487:
--

Assignee: (was: Bill Bejeck)

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347165#comment-16347165
 ] 

Bill Bejeck commented on KAFKA-6487:


No worries [~bdevylde] I'll re-assign the ticket to you

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6510) WARN: Fail to send SSL Close message

2018-01-31 Thread John Chu (JIRA)
John Chu created KAFKA-6510:
---

 Summary: WARN: Fail to send SSL Close message
 Key: KAFKA-6510
 URL: https://issues.apache.org/jira/browse/KAFKA-6510
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: John Chu


I have a thread which once in a while is going to list the topics on the 
Message Hub. But once in a while, I am getting a :Failed to send SSL Close 
message.

Any ideas?

Noticed there is another similar defect open: 
https://issues.apache.org/jira/browse/KAFKA-3702

Not sure if they are the same.
{code:java}
KafkaConsumer consumer = new 
KafkaConsumer<>(getConsumerConfiguration());

try{
   Map topics = consumer.listTopics();
   return new ArrayList(topics.keySet());
}
finally{
if (consumer != null){
  consumer.close();
}
}
{code}
I am getting the warning from *consumer.close*.

The configuration of the consumer:
 * sasl.mechanism = PLAIN
 * security.protocol = SASL_SSL
 * group.id = consumer1
 * ssl.enabled.protocol = TLSv1.2
 * ssl.endpoint.identification.algorithm = HTTPS
 * ssl.protocol = TLSv1.2
 * sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule 
required username="USERNAME" password="PASSWORD";

{quote}[WARN ] 2018-01-25 20:12:23.204 [ClusterChannelMonitorTaskThread] 
org.apache.kafka.common.network.SslTransportLayer {} - Failed to send SSL Close 
message java.io.IOException: Unexpected status returned by SSLEngine.wrap, 
expected CLOSED, received OK. Will not send close message to peer. at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:158)
 [kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.doClose(Selector.java:582) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:573) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:539) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.common.network.Selector.close(Selector.java:250) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:505) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:439)
 [kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1613) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573) 
[kafka-clients-0.11.0.0.jar:?] at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549) 
[kafka-clients-0.11.0.0.jar:?] at 
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4930) Connect Rest API allows creating connectors with an empty name - KIP-212

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347164#comment-16347164
 ] 

ASF GitHub Bot commented on KAFKA-4930:
---

ewencp closed pull request #2755: KAFKA-4930: Added connector name validator …
URL: https://github.com/apache/kafka/pull/2755
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 30802984cac..bb199dde202 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -982,6 +982,40 @@ public String toString() {
 }
 }
 
+public static class NonEmptyStringWithoutControlChars implements Validator 
{
+
+public static NonEmptyStringWithoutControlChars 
nonEmptyStringWithoutControlChars() {
+return new NonEmptyStringWithoutControlChars();
+}
+
+@Override
+public void ensureValid(String name, Object value) {
+String s = (String) value;
+
+if (s == null) {
+// This can happen during creation of the config object due to 
no default value being defined for the
+// name configuration - a missing name parameter is caught 
when checking for mandatory parameters,
+// thus we can ok a null value here
+return;
+} else if (s.isEmpty()) {
+throw new ConfigException(name, value, "String may not be 
empty");
+}
+
+// Check name string for illegal characters
+ArrayList foundIllegalCharacters = new ArrayList<>();
+
+for (int i = 0; i < s.length(); i++) {
+if (Character.isISOControl(s.codePointAt(i))) {
+foundIllegalCharacters.add(s.codePointAt(i));
+}
+}
+
+if (!foundIllegalCharacters.isEmpty()) {
+throw new ConfigException(name, value, "String may not contain 
control sequences but had the following ASCII chars: " + 
Utils.join(foundIllegalCharacters, ", "));
+}
+}
+}
+
 public static class ConfigKey {
 public final String name;
 public final Type type;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 602147b3114..339c51aa4b1 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -160,6 +160,9 @@ public void testValidators() {
 testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", 
new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
 testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new 
Object[]{"abb"}, new Object[] {null});
 testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", 
"b"}, new Object[] {null, -1, "c"});
+testValidators(Type.STRING, new 
ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname",
+new Object[]{"test", "name", "test/test", "test\u1234", 
"\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test   
\n\r", "\n  hello \t"},
+new Object[]{"nontrailing\nnotallowed", "as\u0001cii control 
char", "tes\rt", "test\btest", "1\t2", ""});
 }
 
 @Test
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e63d1002a92..aad12c3a4a5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -37,6 +37,7 @@
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
 
 /**
  * 
@@ -96,7 +97,7 @@ public Object get(String key) {
 
 public static ConfigDef configDef() {
 return new ConfigDef()
-.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, 
COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
+.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, 
nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 
1, 

[jira] [Commented] (KAFKA-6503) Connect: Plugin scan is very slow

2018-01-31 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347021#comment-16347021
 ] 

Randall Hauch commented on KAFKA-6503:
--

Very similar to KAFKA-6208, but the latter actually proposes using the 
ServiceLoader mechanism that requires connector, transform, and converter 
implementations to supply an additional file in their JARs. That will take a 
number of releases to do, so this should be our long term directly.

In the meantime, we should be able to speed up the scanning by either doing it 
in parallel or by scanning once for a set of interfaces (rather than one scan 
per interface). This is a short term win.

> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Major
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,611] INFO Added plugin 
> 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,651] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-jdbc 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,492] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSourceConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,663] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-s3 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 

[jira] [Updated] (KAFKA-6503) Connect: Plugin scan is very slow

2018-01-31 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6503:
-
Priority: Critical  (was: Major)

> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Critical
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,611] INFO Added plugin 
> 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,651] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-jdbc 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,492] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSourceConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,663] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-s3 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,055] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,055] INFO Added plugin 
> 'io.confluent.connect.s3.S3SinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,061] INFO Added plugin 
> 'io.confluent.connect.storage.tools.SchemaSourceConnector' 
> 

[jira] [Commented] (KAFKA-6498) Add RocksDB statistics via Streams metrics

2018-01-31 Thread james chien (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347019#comment-16347019
 ] 

james chien commented on KAFKA-6498:


I have interest to this, but need to figure out how to do this.

> Add RocksDB statistics via Streams metrics
> --
>
> Key: KAFKA-6498
> URL: https://issues.apache.org/jira/browse/KAFKA-6498
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's own stats can be programmatically exposed via 
> {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many 
> useful settings already. However these stats are not exposed directly via 
> Streams today and hence for any users who wants to get access to them they 
> have to manually interact with the underlying RocksDB directly, not through 
> Streams.
> We should expose such stats via Streams metrics programmatically for users to 
> investigate them without trying to access the rocksDB directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6312) Add documentation about kafka-consumer-groups.sh's ability to set/change offsets

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347005#comment-16347005
 ] 

ASF GitHub Bot commented on KAFKA-6312:
---

tankhiwale opened a new pull request #4496: KAFKA-6312: Update website 
documentation for --reset-offsets option, …
URL: https://github.com/apache/kafka/pull/4496
 
 
   …for Kafka consumer groups, introduced in KIP-122
   
   KIP-122 added the ability for kafka-consumer-groups.sh to reset/change 
consumer offsets, at a fine grained level.
   
   There is documentation on it in the kafka-consumer-groups.sh usage text.
   
   There was no such documentation on the kafka.apache.org website. This change 
updates the documentation on the website, so that users can read about the 
functionality without having the tools installed.
   
   @omkreddy , @guozhangwang , @ijuma kindly review this PR. This is my first 
first contribution to any project ever. Please let me know if I have missed 
something.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation about kafka-consumer-groups.sh's ability to set/change 
> offsets
> 
>
> Key: KAFKA-6312
> URL: https://issues.apache.org/jira/browse/KAFKA-6312
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: James Cheng
>Assignee: Mayank Tankhiwale
>Priority: Major
>  Labels: newbie
>
> KIP-122 added the ability for kafka-consumer-groups.sh to reset/change 
> consumer offsets, at a fine grained level.
> There is documentation on it in the kafka-consumer-groups.sh usage text. 
> There is no such documentation on the kafka.apache.org website. We should add 
> some documentation to the website, so that users can read about the 
> functionality without having the tools installed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6509) Add additional tests for validating store restoration completes before Topology is intitalized

2018-01-31 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6509:
--

 Summary: Add additional tests for validating store restoration 
completes before Topology is intitalized
 Key: KAFKA-6509
 URL: https://issues.apache.org/jira/browse/KAFKA-6509
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: Bill Bejeck
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6508) Look into optimizing StreamPartitionAssignor StandbyTask Assignment

2018-01-31 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6508:
--

 Summary: Look into optimizing StreamPartitionAssignor StandbyTask 
Assignment
 Key: KAFKA-6508
 URL: https://issues.apache.org/jira/browse/KAFKA-6508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
 Fix For: 1.2.0


Currently, we have two lists of tasks in the StreamsPartitionAssignor, active 
and standby; we should look into optimizing to have one list of tasks to ensure 
a balance of active and standby tasks during the assignment.   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6312) Add documentation about kafka-consumer-groups.sh's ability to set/change offsets

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346938#comment-16346938
 ] 

ASF GitHub Bot commented on KAFKA-6312:
---

tankhiwale closed pull request #123: KAFKA-6312: Update website documentation 
for --reset-offsets option, …
URL: https://github.com/apache/kafka-site/pull/123
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/10/ops.html b/10/ops.html
index 92077df..59d7e17 100644
--- a/10/ops.html
+++ b/10/ops.html
@@ -183,6 +183,15 @@ Managing C
   test-foo   0  1   3   2  
consumer-1-a5d61779-4d04-4c50-a6d6-fb35d942642d   /127.0.0.1
 consumer-1
   
 
+  To reset offsets of a consumer group to latest offset, first make sure the 
instances are inactive, then:
+
+  
+   bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 
--reset-offsets --group consumergroup1 --topic topic1 --to-latest
+
+  TOPIC  PARTITION  NEW-OFFSET
+  topic1 0  0
+  
+
   If you are using the old high-level consumer and storing the group metadata 
in ZooKeeper (i.e. offsets.storage=zookeeper), pass
   --zookeeper instead of bootstrap-server:
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation about kafka-consumer-groups.sh's ability to set/change 
> offsets
> 
>
> Key: KAFKA-6312
> URL: https://issues.apache.org/jira/browse/KAFKA-6312
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: James Cheng
>Assignee: Mayank Tankhiwale
>Priority: Major
>  Labels: newbie
>
> KIP-122 added the ability for kafka-consumer-groups.sh to reset/change 
> consumer offsets, at a fine grained level.
> There is documentation on it in the kafka-consumer-groups.sh usage text. 
> There is no such documentation on the kafka.apache.org website. We should add 
> some documentation to the website, so that users can read about the 
> functionality without having the tools installed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6312) Add documentation about kafka-consumer-groups.sh's ability to set/change offsets

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346889#comment-16346889
 ] 

ASF GitHub Bot commented on KAFKA-6312:
---

tankhiwale opened a new pull request #123: KAFKA-6312: Update website 
documentation for --reset-offsets option, …
URL: https://github.com/apache/kafka-site/pull/123
 
 
   …for Kafka consumer groups, introduced in KIP-122
   
   KIP-122 added the ability for kafka-consumer-groups.sh to reset/change 
consumer offsets, at a fine grained level.
   
   There is documentation on it in the kafka-consumer-groups.sh usage text.
   
   There was no such documentation on the kafka.apache.org website. This change 
updates the documentation on the website, so that users can read about the 
functionality without having the tools installed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation about kafka-consumer-groups.sh's ability to set/change 
> offsets
> 
>
> Key: KAFKA-6312
> URL: https://issues.apache.org/jira/browse/KAFKA-6312
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: James Cheng
>Assignee: Mayank Tankhiwale
>Priority: Major
>  Labels: newbie
>
> KIP-122 added the ability for kafka-consumer-groups.sh to reset/change 
> consumer offsets, at a fine grained level.
> There is documentation on it in the kafka-consumer-groups.sh usage text. 
> There is no such documentation on the kafka.apache.org website. We should add 
> some documentation to the website, so that users can read about the 
> functionality without having the tools installed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6507) NPE in KafkaStatusBackingStore

2018-01-31 Thread Itay Cohai (JIRA)
Itay Cohai created KAFKA-6507:
-

 Summary: NPE in KafkaStatusBackingStore
 Key: KAFKA-6507
 URL: https://issues.apache.org/jira/browse/KAFKA-6507
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.11.0.1
 Environment: We are using Kafka 0.10.0.1 with Kafka connect 0.11.0.1. 
Reporter: Itay Cohai


Found the following NPE in our kafka connect logs:

2018-01-30 13:15:34,391] ERROR Unexpected exception in Thread[KafkaBasedLog 
Work Thread - itay_test-connect-status,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog:334)

java.lang.NullPointerException

    at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore.read(KafkaStatusBackingStore.java:441)

    at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore$1.onCompletion(KafkaStatusBackingStore.java:148)

    at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore$1.onCompletion(KafkaStatusBackingStore.java:145)

    at 
org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:258)

    at 
org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)

    at 
org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)

 

If I look at the source, looks like the key comes up NULL from the status 
topic, strange.

void read(ConsumerRecord record) {
 String key = record.key();

//This line --> if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
 readConnectorStatus(key, record.value());
 } else if (key.startsWith(TASK_STATUS_PREFIX)) {
 readTaskStatus(key, record.value());
 } else {
 log.warn("Discarding record with invalid key {}", key);
 }
 }






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bart De Vylder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346831#comment-16346831
 ] 

Bart De Vylder commented on KAFKA-6487:
---

[~bbejeck] i started working on this issue before I saw this ticket. I made a 
PR for it here: [https://github.com/apache/kafka/pull/4495]

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346824#comment-16346824
 ] 

ASF GitHub Bot commented on KAFKA-6487:
---

bartdevylder opened a new pull request #4495: KAFKA-6487: 
ChangeLoggingKeyValueBytesStore does not propagate delete
URL: https://github.com/apache/kafka/pull/4495
 
 
   The `ChangeLoggingKeyValueBytesStore` used to write null to its underlying 
store instead of propagating the delete, which has two drawbacks:
   - an iterator will see null values
   - unbounded memory growth of the underlying in-memory keyvalue store
   
   The fix will just propagate the delete instead of performing put(key, null). 
   
   The changes to the tests:
   - extra test whether the key is really gone after delete by calling the 
`approximateEntries` on the underlying store. This number is exact because we 
know the underlying store in the test is of type `InMemoryKeyValueStore`
   - extra test to check a delete is logged as   (the existing test 
would also succeed if the key is just absent)
   
   While also updating the corresponding tests of the 
`ChangeLoggingKeyValueStore` I noticed the class is nowhere used anymore so I 
removed it from the source code for clarity.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6505) Add simple raw "offset-commit-failures", "offset-commits" and "offset-commit-successes" count metric

2018-01-31 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346777#comment-16346777
 ] 

Per Steffensen commented on KAFKA-6505:
---

This ticket was labeled "needs-kip". I guess adding metrics does not require a 
KIP, but that a change in strategy from "exposing advanced metrics" to "expose 
simple raw metrics" will?

Should I open such a KIP?

If we just make this ticket about introducing the following metrics, will it 
still require a KIP?
 * offset-commit-attempts (number of offset commits attempted (successful or 
failing) since startup)
 * offset-commit-failures (number of offset commits that failed since startup)
 * offset-commit-duration (total time in ms used doing offset-commits 
(successful or failing) since startup)
 * offset-commit-failures-duration (total time in ms used doing failing 
offset-commits since startup)

Note that number of successful offset-commits can be calculated by 
offset-commit-attempts minus offset-commit-failures, and that the time spent on 
those can be calculated by offset-commit-duration minus 
offset-commit-failures-duration

For the KIP:

I believe that what people are doing today, is setting up a continuous poll of 
metrics (e.g. using Prometheus), and that "advanced" metrics will be done 
outside Kafka/Kafka-Connect/Kafka-clients based on the collected "raw" metrics.

E.g. setup Prometheus to pull metrics from a Kafka-Connect every 30th sec. 
Example of what has been polled (stored in Prometheus)
||Poll 
timestamp||offset-commit-attempts||offset-commit-failures||offset-commit-duration||offset-commit-failures-duration||Note||
|1/1-2018 10:00:00|10|1|1000|900|At this point the application has been running 
for at while, doing 10 offset commits, of which 9 succeeded and 1 failed. The 9 
successful took a total of 100 ms, while the failing one took 900 ms|
|1/1-2018 10:00:30|11|1|1050|900|Within the 30 secs one successful 
offset-commit was run spending 50 ms|
|1/1-2018 10:01:00|13|2|1500|1300|Within the 30 secs two offset-commits were 
run - one successful (50 ms) and one failing (400 ms)|
|1/1-2018 10:01:30|13|2|1500|1300|Within the 30 secs no offset-commits were run|
|1/1-2018 10:02:00|14|2|1530|1300|Within the 30 secs one successful 
offset-commit was run spending 30 ms|

Now if you use PromQL, e.g. through Grafana (which make very nice presentations 
of the data) you can do some of the more advanced stuff
{code:java}
idelta(offset_commit_failures[1m]){code}
Will show a graph (or whatever) of the number of failing offset-commits "here 
and now" over time: 0 at 10:00:30, 1 at 10:01:00, 0 at 10:01:30 and 0 at 
10:02:00
{code:java}
100 * (delta(offset_commit_failures[1m]) / 
delta(offset_commit_attempts[1m])){code}
Will show a graph (or whatever) of the percentage of offset-commits that failed 
over the "last" (relative to the time in the graph) minute. E.g. at 10:01:30 it 
will show 100 * ((2-1) / (13-11)) = 100 * (1 / 2) = 50. You can easily make it 
"over the last 5 minutes" or whatever.
{code:java}
1000 * delta(offset_commit_duration[5m]){code}
Will show a graph (or whatever) of the number of seconds used doing 
offset-commits (failing or succeeded) over the last 5 minutes (300 secs).
{code:java}
delta(offset_commit_duration[5m]) / delta(offset_commit_attempts[5m]){code}
Will show a graph (or whatever) of the average number of milliseconds used 
doing offset-commits (failing or succeeding) over the last 5 minutes.
{code:java}
100 * (delta(offset_commit_failure_duration[5m]) / 
delta(offset_commit_duration[5m])){code}
Will show a graph (or whatever) of the percentage of time used doing 
offset-commits that was used doing failing offset-commits, within the last 5 
minutes.

You can to a lot of such stuff a presentation-time based on raw numbers - I 
just showed a very small faction of what you can express in PromQL. If you do 
the advanced calculations in the metrics themselves, you will be limiting the 
flexibility of the end administrator, by taking decisions for him about what he 
wants to see.

> Add simple raw "offset-commit-failures", "offset-commits" and 
> "offset-commit-successes" count metric
> 
>
> Key: KAFKA-6505
> URL: https://issues.apache.org/jira/browse/KAFKA-6505
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Minor
>  Labels: needs-kip
>
> MBean 
> "kafka.connect:type=connector-task-metrics,connector=,task=x" 
> has several attributes. Most of them seems to be avg/max/pct over the entire 
> lifetime of the process. They are not very useful when monitoring a system, 
> where you typically want to see when there have been problems and if there 
> are problems right 

[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346626#comment-16346626
 ] 

ASF GitHub Bot commented on KAFKA-6378:
---

andybryant opened a new pull request #4494: KAFKA-6378 KStream-GlobalKTable 
null KeyValueMapper handling
URL: https://github.com/apache/kafka/pull/4494
 
 
   For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate 
no match
   
   For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key 
from the stream records into the `GlobalKTable`. For some stream values there 
may be no valid reference to the table stream. This patch allows developers to 
use `null` return values to indicate there is no possible match. This is 
possible in this case since `null` is never a valid key value for a 
`GlobalKTable`.
   Without this patch, providing a `null` value caused the stream to crash on 
Kafka 1.0.
   
   I added unit tests for KStream-GlobalKTable left and inner joins, since they 
were missing. I also covered this additional scenario where `KeyValueMapper` 
returns `null` to insure it is handled correctly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Priority: Major
> Fix For: 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> 

[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask

2018-01-31 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346563#comment-16346563
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 1/31/18 10:40 AM:
-

Answers:
 # v. 1.0.0 it is exatly what states in JIRA. The problem specification 
originates in 1.0.0.
 # I had downgraded the code, and was stable for along time, but yesterday I 
recognize, that it is also crashing, but not that easy as 1.0.0. I had descibed 
above how I reproduced in my environment the crashes. The main point is to run 
streamming application for a longer period of time, perhaps depending on 
retention time of the topics (my configuration is 2 days).


was (Author: habdank):
Answers:
 # v. 1.0.0 it is exatly what is in JIRA. The problem specification originates 
in 1.0.0.
 # I had down graded the code, and was stable for along time, but yesterday I 
recognize, that it is also crashing, but not that easy as 1.0.0. I had descibed 
above how I reproduced in my environment the crashes. The main point is to run 
streamming application longer peroiod of time, perhaps depending on retention 
time of the topics (my configuration is 2 days).

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-01-31 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346563#comment-16346563
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-5882:
---

Answers:
 # v. 1.0.0 it is exatly what is in JIRA. The problem specification originates 
in 1.0.0.
 # I had down graded the code, and was stable for along time, but yesterday I 
recognize, that it is also crashing, but not that easy as 1.0.0. I had descibed 
above how I reproduced in my environment the crashes. The main point is to run 
streamming application longer peroiod of time, perhaps depending on retention 
time of the topics (my configuration is 2 days).

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-31 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6378:
--
Fix Version/s: (was: 1.1.0)
   1.0.1

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Priority: Major
> Fix For: 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-31 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-6378.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4424
[https://github.com/apache/kafka/pull/4424]

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Priority: Major
> Fix For: 1.1.0
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)