[jira] [Commented] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass

2018-11-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676229#comment-16676229
 ] 

ASF GitHub Bot commented on KAFKA-7559:
---

ijuma closed pull request #5883: KAFKA-7559: Correct standalone system tests to 
use the correct external file
URL: https://github.com/apache/kafka/pull/5883
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index 2d8ac2d3c9e..f01ff0a835e 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -47,7 +47,7 @@ class ConnectStandaloneFileTest(Test):
 
 OFFSETS_FILE = "/mnt/connect.offsets"
 
-TOPIC = 
"${file:/mnt/connect/connect-file-external.properties:topic.external}"
+TOPIC = 
"${file:/mnt/connect/connect-external-configs.properties:topic.external}"
 TOPIC_TEST = "test"
 
 FIRST_INPUT_LIST = ["foo", "bar", "baz"]


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConnectStandaloneFileTest system tests do not pass
> --
>
> Key: KAFKA-7559
> URL: https://issues.apache.org/jira/browse/KAFKA-7559
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Randall Hauch
>Priority: Major
>
> Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under 
> `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with 
> error messages similar to:
> "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in 
> condition mode: LISTEN"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4453) add request prioritization

2018-11-05 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-4453:
-

Assignee: Mayuresh Gharat  (was: Lucas Wang)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-11-05 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676115#comment-16676115
 ] 

Lee Dongjin commented on KAFKA-7549:


[~ijuma]  [~hachikuji] Ping!

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7596) Add observer interface to record request and response

2018-11-05 Thread Lincong Li (JIRA)
Lincong Li created KAFKA-7596:
-

 Summary: Add observer interface to record request and response
 Key: KAFKA-7596
 URL: https://issues.apache.org/jira/browse/KAFKA-7596
 Project: Kafka
  Issue Type: Improvement
Reporter: Lincong Li
Assignee: Lincong Li


The interface could be used in the KafkaApis class to record each 
request-response pair. The motivation of introducing this observer is to enable 
or improve a Kafka audit system. Details are discussed in 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-05 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675920#comment-16675920
 ] 

John Roesler commented on KAFKA-7595:
-

[~damianguy], Can you confirm my reasoning above? I don't want to confuse the 
issue with my own misunderstanding...

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-11-05 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675916#comment-16675916
 ] 

Matthias J. Sax commented on KAFKA-7510:


[~MrKafka] I added you to the list of contributors and assigned this ticket to 
you. You can know also self-assign tickets.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Assignee: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-11-05 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7510:
--

Assignee: Mr Kafka

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Assignee: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-05 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675891#comment-16675891
 ] 

John Roesler commented on KAFKA-7595:
-

Not sure if I'm thinking about this properly, but...

For example you get some event  on the left and  on the right, 
and you want to produce . Let's say you get the left event first.
 # With *caching disabled*, *whether or not the join is materialized*, I'd 
expect to see  followed by . It's not clear to me 
if you call this a "duplicate".
 # With *caching enabled* and the *join not materialized*, I'd expect to see 
duplicates:  followed by .
 ** This could happen if  gets cached on the left and  gets 
cached on the right. They only trigger the join upon flush, and they trigger it 
independently, so when the left trigger happens, the right value is already 
visible and vice-versa, hence the duplicates.
 ** Even though the caches try to de-duplicate events, they are mutually 
oblivious (they don't know their results will later be used in a join, and they 
each don't know the other exists), so they can't cooperate to de-duplicate the 
results.
 # If the *join is materialized AND caching is enabled*, then there are still 
"technically" duplicate events, but they get de-duplicated by the join's cache.

*So this this in mind, can you clarify which of these you see contradicted?* 

*In particular, I wasn't sure if you are calling the sequence ", 
" a "duplicate" or not. I _think_ this result would be expected 
for your cases 1 and 2.*

 

And just a plug for this new feature, if you'd like to achieve de-duplication 
of either sequence ", " or ", " without having to materialize the join, as of 2.1, you can use the new 
"suppress" operator: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-05 Thread Vik Gamov (JIRA)
Vik Gamov created KAFKA-7595:


 Summary: Kafka Streams: KTrable to KTable join introduces 
duplicates in downstream KTable
 Key: KAFKA-7595
 URL: https://issues.apache.org/jira/browse/KAFKA-7595
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: Vik Gamov


When perform KTable to KTable join after aggregation, there are duplicates in 
resulted KTable.

1. caching disabled, no materialized => duplicates

{{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);}}

{{KTable ratingCounts = ratingsById.count();}}
{{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}

{{KTable ratingAverage = ratingSums.join(ratingCounts,}}
{{ (sum, count) -> sum / count.doubleValue());}}

2. caching disabled, materialized => duplicate

{{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
0);}}{{KTable ratingCounts = ratingsById.count();}}
{{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}

{{KTable ratingAverage = ratingSums.join(ratingCounts,}}
{{ (sum, count) -> sum / count.doubleValue(),}}
{{ Materialized.as("average-ratings"));}}


3. caching enabled, materiazlized => all good

{{// Enable record cache of size 10 MB.}}
{{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 
1024 * 1024L);}}
{{// Set commit interval to 1 second.}}
{{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);}}{{KTable ratingCounts = ratingsById.count();}}
{{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}

{{KTable ratingAverage = ratingSums.join(ratingCounts,}}
{{ (sum, count) -> sum / count.doubleValue(),}}
{{ Materialized.as("average-ratings"));}}

 

Demo app 
[https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-11-05 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675694#comment-16675694
 ] 

Stanislav Kozlovski commented on KAFKA-6812:


Hi [~kamalkang] , no progress has been made on this ticket. My last comment 
stands, I presume, about creating a KIP - it's a public API change.

Feel free to reassign this issue ticket to yourself and start work on it - it 
would be appreciated by the community :)

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-11-05 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675694#comment-16675694
 ] 

Stanislav Kozlovski edited comment on KAFKA-6812 at 11/5/18 7:50 PM:
-

Hi [~kamalkang] , no progress has been made on this ticket. My last comment 
about creating a KIP stands - it's a public API change and is needed.

Feel free to reassign this issue ticket to yourself and start work on it - it 
would be appreciated by the community :)


was (Author: enether):
Hi [~kamalkang] , no progress has been made on this ticket. My last comment 
stands, I presume, about creating a KIP - it's a public API change.

Feel free to reassign this issue ticket to yourself and start work on it - it 
would be appreciated by the community :)

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-11-05 Thread Kamal Kang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675686#comment-16675686
 ] 

Kamal Kang commented on KAFKA-6812:
---

[~enether], [~andrasbeni] - Just wanted to know what the status of this ticket 
is as we are interested in this change too and can help with the implementation.

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2018-11-05 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675610#comment-16675610
 ] 

Elias Levy commented on KAFKA-3932:
---

[~chienhsw] Alas, I reported the issue two years ago and have not had the 
opportunity to revisit it as I've moved on to other things.  That said, it 
seems like your proposal would have addressed the fairness issue I raised.  

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Assignee: CHIENHSING WU
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass

2018-11-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675591#comment-16675591
 ] 

ASF GitHub Bot commented on KAFKA-7559:
---

rhauch opened a new pull request #5883: KAFKA-7559: Correct standalone system 
tests to use the correct external file
URL: https://github.com/apache/kafka/pull/5883
 
 
   This fixes the Connect standalone system tests. This should be backported to 
the `2.0` branch, since that's when the tests were first modified to use the 
external property file.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConnectStandaloneFileTest system tests do not pass
> --
>
> Key: KAFKA-7559
> URL: https://issues.apache.org/jira/browse/KAFKA-7559
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Randall Hauch
>Priority: Major
>
> Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under 
> `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with 
> error messages similar to:
> "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in 
> condition mode: LISTEN"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass

2018-11-05 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-7559:


Assignee: Randall Hauch

> ConnectStandaloneFileTest system tests do not pass
> --
>
> Key: KAFKA-7559
> URL: https://issues.apache.org/jira/browse/KAFKA-7559
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Randall Hauch
>Priority: Major
>
> Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under 
> `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with 
> error messages similar to:
> "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in 
> condition mode: LISTEN"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7591) Changelog retention period doesn't synchronise with window-store size

2018-11-05 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675529#comment-16675529
 ] 

Matthias J. Sax commented on KAFKA-7591:


I updated this a "improvement" as the describe behavior is by design. It's not 
expected that the window-size is changed.

> Changelog retention period doesn't synchronise with window-store size
> -
>
> Key: KAFKA-7591
> URL: https://issues.apache.org/jira/browse/KAFKA-7591
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jon Bates
>Priority: Major
>
> When a new windowed state store is created, the associated changelog topic's 
> `retention.ms` value is set to `window-size + 
> CHANGELOG_ADDITIONAL_RETENTION_MS`
> h3. Expected Behaviour
> If the window-size is updated, the changelog topic's `retention.ms` config 
> should be updated to reflect the new size
> h3. Actual Behaviour
> The changelog-topic's `retention.ms` setting is not amended, resulting in 
> possible loss of data upon application restart
>  
> n.b. Although it is easy to update changelog topic config, I logged this as 
> `major` due to the potential for data-loss for any user of Kafka-Streams who 
> may not be intimately aware of the relationship between a windowed store and 
> the changelog config



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7591) Changelog retention period doesn't synchronise with window-store size

2018-11-05 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7591:
---
Issue Type: Improvement  (was: Bug)

> Changelog retention period doesn't synchronise with window-store size
> -
>
> Key: KAFKA-7591
> URL: https://issues.apache.org/jira/browse/KAFKA-7591
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jon Bates
>Priority: Major
>
> When a new windowed state store is created, the associated changelog topic's 
> `retention.ms` value is set to `window-size + 
> CHANGELOG_ADDITIONAL_RETENTION_MS`
> h3. Expected Behaviour
> If the window-size is updated, the changelog topic's `retention.ms` config 
> should be updated to reflect the new size
> h3. Actual Behaviour
> The changelog-topic's `retention.ms` setting is not amended, resulting in 
> possible loss of data upon application restart
>  
> n.b. Although it is easy to update changelog topic config, I logged this as 
> `major` due to the potential for data-loss for any user of Kafka-Streams who 
> may not be intimately aware of the relationship between a windowed store and 
> the changelog config



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7594) Make the time out for connect service to be dynamic

2018-11-05 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7594:
--

 Summary: Make the time out for connect service to be dynamic
 Key: KAFKA-7594
 URL: https://issues.apache.org/jira/browse/KAFKA-7594
 Project: Kafka
  Issue Type: Test
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar


Currently, the timeout for Connect service to be started in the ducktape tests 
are hardcoded at 60 sec. The proposal is to make it configurable via the 
service init. The default would still be 60 seconds. This will allow downstream 
connectors to pass their own timeout values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-11-05 Thread sachin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675053#comment-16675053
 ] 

sachin edited comment on KAFKA-7280 at 11/5/18 4:50 PM:


Hi [~rsivaram]

I am getting this error also with Kafka 1.1.0.  Is there any workaround that i 
can use till the fix is available in a public build?

 


was (Author: sachinu):
Hi,

I am getting this error also with Kafka 1.1.0.  Is there any workaround that i 
can use till the fix is available in a public build?

 

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> 

[jira] [Created] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-05 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-7593:
-

 Summary: Infinite restart loop when failed to store big config for 
task
 Key: KAFKA-7593
 URL: https://issues.apache.org/jira/browse/KAFKA-7593
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Oleg Kuznetsov


In case when config message for config topic is greater than kafka broker 
allows to store, source connector starts infinite restart loop without any 
error indication.

There could be an exception thrown in this case or a smarter handling of big 
config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7590) GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR

2018-11-05 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675326#comment-16675326
 ] 

Ryanne Dolan commented on KAFKA-7590:
-

did you mean to leak that license key?

> GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR
> 
>
> Key: KAFKA-7590
> URL: https://issues.apache.org/jira/browse/KAFKA-7590
> Project: Kafka
>  Issue Type: Test
>  Components: config, KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Chenchu Lakshman kumar
>Priority: Major
>
> Message
>  
> {"schema":{"type":"struct","fields":[
> {"type":"string","optional":false,"doc":"This field stores the value of 
> `Message.getJMSMessageID() 
> `_.","field":"messageID"}
> ,{"type":"string","optional":false,"doc":"This field stores the type of 
> message that was received. This corresponds to the subinterfaces of `Message 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html]>`_. 
> `BytesMessage 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html]>`_ = 
> `bytes`, `MapMessage 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html]>`_ = `map`, 
> `ObjectMessage 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html]>`_ = 
> `object`, `StreamMessage 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html]>`_ = 
> `stream` and `TextMessage 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html]>`_ = 
> `text`. The corresponding field will be populated with the values from the 
> respective Message 
> subinterface.","field":"messageType"},{"type":"int64","optional":false,"doc":"Data
>  from the `getJMSTimestamp() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSTimestamp(])>`_
>  method.","field":"timestamp"},{"type":"int32","optional":false,"doc":"This 
> field stores the value of `Message.getJMSDeliveryMode() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSDeliveryMode(])>`_.","field":"deliveryMode"},{"type":"string","optional":true,"doc":"This
>  field stores the value of `Message.getJMSCorrelationID() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSCorrelationID(])>`_.","field":"correlationID"},{"type":"struct","fields":[
> {"type":"string","optional":false,"doc":"The type of JMS Destination, and 
> either ``queue`` or ``topic``.","field":"destinationType"}
> ,{"type":"string","optional":false,"doc":"The name of the destination. This 
> will be the value of `Queue.getQueueName() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName(])>`_ 
> or `Topic.getTopicName() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName(])>`_.","field":"name"}],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This
>  schema is used to represent a JMS Destination, and is either `queue 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html]>`_ or `topic 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html]>`_.","field":"replyTo"},{"type":"struct","fields":[
> {"type":"string","optional":false,"doc":"The type of JMS Destination, and 
> either ``queue`` or ``topic``.","field":"destinationType"}
> ,{"type":"string","optional":false,"doc":"The name of the destination. This 
> will be the value of `Queue.getQueueName() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName(])>`_ 
> or `Topic.getTopicName() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName(])>`_.","field":"name"}],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This
>  schema is used to represent a JMS Destination, and is either `queue 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html]>`_ or `topic 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html]>`_.","field":"destination"},{"type":"boolean","optional":false,"doc":"This
>  field stores the value of `Message.getJMSRedelivered() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSRedelivered(])>`_.","field":"redelivered"},{"type":"string","optional":true,"doc":"This
>  field stores the value of `Message.getJMSType() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSType(])>`_.","field":"type"},{"type":"int64","optional":false,"doc":"This
>  field stores the value of `Message.getJMSExpiration() 
> <[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSExpiration(])>`_.","field":"expiration"},{"type":"int32","optional":false,"doc":"This
>  field stores the value of `Message.getJMSPriority() 
> 

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-05 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675307#comment-16675307
 ] 

Matthias J. Sax commented on KAFKA-7580:


Not sure. Feel free to pick it up. It's the idea of an open source project :)

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
> root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/grandparent
> mkdir: cannot create 

[jira] [Issue Comment Deleted] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-05 Thread Vitalina Horyukova (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitalina Horyukova updated KAFKA-5503:
--
Comment: was deleted

(was: Hi! I believe it'll help. [PR|https://github.com/apache/kafka/pull/5881].)

> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-05 Thread Vitalina Horyukova (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675197#comment-16675197
 ] 

Vitalina Horyukova commented on KAFKA-5503:
---

Hi! I believe it'll help. [PR|https://github.com/apache/kafka/pull/5881].

> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675189#comment-16675189
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   …erId
   
   Check running in `Sender.maybeWaitForProducerId`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-11-05 Thread sachin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675053#comment-16675053
 ] 

sachin commented on KAFKA-7280:
---

Hi,

I am getting this error also with Kafka 1.1.0.  Is there any workaround that i 
can use till the fix is available in a public build?

 

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
>  

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-05 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675041#comment-16675041
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

Hi Matthias,

Just wanted to know by when will we have the suggested improvement in the 
failing unit test? Is it possible anytime soon?

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
> root@p006vm18:/tmp# 

[jira] [Created] (KAFKA-7592) kafka consumer poll不能关闭,3节点关闭2节点broke和zk

2018-11-05 Thread lvliguo (JIRA)
lvliguo created KAFKA-7592:
--

 Summary: kafka consumer poll不能关闭,3节点关闭2节点broke和zk
 Key: KAFKA-7592
 URL: https://issues.apache.org/jira/browse/KAFKA-7592
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.0
 Environment: jdk1.8.73
Linux version 3.12.49-11-default (geeko@buildhost) (gcc version 4.8.5 (SUSE 
Linux) ) #1 SMP Wed Nov 11 20:52:43 UTC 2015 (8d714a0)
SUSE Linux Enterprise Server 12 SP1 release 12.1
Reporter: lvliguo


【问题现象】:KafkaConsume.poll方法无法终止,导致线程无法关闭。

【复现方法】:安装3节点的kafka集群(3个broke和3个zk)。先关闭2个zk导致zk不可用,然后再关闭2个broke。然后向创建kafkaConsumer,发送poll请求。topic是单副本单分区的。

【异常信息】:

[2018-11-05 15:42:23,318/CST][consumer 
thread://mano-test-02/my_json_group][WARN][RHM_Beta_Service01][rest][Connection 
exception with /10.21.84.172 
disconnected][org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:407)]
java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
 at 
org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:110)
 at 
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:365)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:332)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:375)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:234)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:358)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:327)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:329)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1056)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1021)
 at 
com.huawei.middleware.dms.store.kafka.DmsKafkaConsumers.pullFromStore(DmsKafkaConsumers.java:239)
 at 
com.huawei.middleware.dms.store.kafka.DmsKafkaConsumerThread.doRun(DmsKafkaConsumerThread.java:96)
 at 
com.huawei.middleware.dms.store.kafka.DmsKafkaConsumerThread.run(DmsKafkaConsumerThread.java:60)

【堆栈】:

"consumer thread://mano-test-01/my_json_group" #182 prio=5 os_prio=0 
tid=0x7f1d14016000 nid=0x19a80 waiting on condition [0x7f1cac8eb000]
 java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:372)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:327)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:329)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1056)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1021)
 at 
com.huawei.middleware.dms.store.kafka.DmsKafkaConsumers.pullFromStore(DmsKafkaConsumers.java:239)
 at 
com.huawei.middleware.dms.store.kafka.DmsKafkaConsumerThread.doRun(DmsKafkaConsumerThread.java:96)
 at 
com.huawei.middleware.dms.store.kafka.DmsKafkaConsumerThread.run(DmsKafkaConsumerThread.java:60)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7591) Changelog retention period doesn't synchronise with window-store size

2018-11-05 Thread Jon Bates (JIRA)
Jon Bates created KAFKA-7591:


 Summary: Changelog retention period doesn't synchronise with 
window-store size
 Key: KAFKA-7591
 URL: https://issues.apache.org/jira/browse/KAFKA-7591
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Jon Bates


When a new windowed state store is created, the associated changelog topic's 
`retention.ms` value is set to `window-size + CHANGELOG_ADDITIONAL_RETENTION_MS`
h3. Expected Behaviour

If the window-size is updated, the changelog topic's `retention.ms` config 
should be updated to reflect the new size
h3. Actual Behaviour

The changelog-topic's `retention.ms` setting is not amended, resulting in 
possible loss of data upon application restart

 

n.b. Although it is easy to update changelog topic config, I logged this as 
`major` due to the potential for data-loss for any user of Kafka-Streams who 
may not be intimately aware of the relationship between a windowed store and 
the changelog config



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7590) GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR

2018-11-05 Thread Chenchu Lakshman kumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenchu Lakshman kumar updated KAFKA-7590:
--
Description: 
Message
 

{"schema":{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"This field stores the value of 
`Message.getJMSMessageID() 
`_.","field":"messageID"}

,{"type":"string","optional":false,"doc":"This field stores the type of message 
that was received. This corresponds to the subinterfaces of `Message 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html]>`_. `BytesMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html]>`_ = 
`bytes`, `MapMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html]>`_ = `map`, 
`ObjectMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html]>`_ = 
`object`, `StreamMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html]>`_ = 
`stream` and `TextMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html]>`_ = `text`. 
The corresponding field will be populated with the values from the respective 
Message 
subinterface.","field":"messageType"},{"type":"int64","optional":false,"doc":"Data
 from the `getJMSTimestamp() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSTimestamp(])>`_
 method.","field":"timestamp"},{"type":"int32","optional":false,"doc":"This 
field stores the value of `Message.getJMSDeliveryMode() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSDeliveryMode(])>`_.","field":"deliveryMode"},{"type":"string","optional":true,"doc":"This
 field stores the value of `Message.getJMSCorrelationID() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSCorrelationID(])>`_.","field":"correlationID"},{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"The type of JMS Destination, and 
either ``queue`` or ``topic``.","field":"destinationType"}

,{"type":"string","optional":false,"doc":"The name of the destination. This 
will be the value of `Queue.getQueueName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName(])>`_ 
or `Topic.getTopicName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName(])>`_.","field":"name"}],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This
 schema is used to represent a JMS Destination, and is either `queue 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html]>`_ or `topic 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html]>`_.","field":"replyTo"},{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"The type of JMS Destination, and 
either ``queue`` or ``topic``.","field":"destinationType"}

,{"type":"string","optional":false,"doc":"The name of the destination. This 
will be the value of `Queue.getQueueName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName(])>`_ 
or `Topic.getTopicName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName(])>`_.","field":"name"}],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This
 schema is used to represent a JMS Destination, and is either `queue 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html]>`_ or `topic 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html]>`_.","field":"destination"},{"type":"boolean","optional":false,"doc":"This
 field stores the value of `Message.getJMSRedelivered() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSRedelivered(])>`_.","field":"redelivered"},{"type":"string","optional":true,"doc":"This
 field stores the value of `Message.getJMSType() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSType(])>`_.","field":"type"},{"type":"int64","optional":false,"doc":"This
 field stores the value of `Message.getJMSExpiration() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSExpiration(])>`_.","field":"expiration"},{"type":"int32","optional":false,"doc":"This
 field stores the value of `Message.getJMSPriority() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSPriority(])>`_.","field":"priority"},{"type":"map","keys":

{"type":"string","optional":false}

,"values":{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"The java type of the property on the 
Message. One of ``boolean``, ``byte``, ``short``, ``integer``, ``long``, 
``float``, ``double``, or ``string``.","field":"propertyType"}

,\{"type":"boolean","optional":true,"doc":"The value stored as a boolean. Null 
unless ``propertyType`` is set to 
``boolean``.","field":"boolean"},\{"type":"int8","optional":true,"doc":"The 
value stored as a byte. Null unless ``propertyType`` is set to 

[jira] [Commented] (KAFKA-7380) Global thread restore blocks KafkaStreams#start()

2018-11-05 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674804#comment-16674804
 ] 

Patrik Kleindl commented on KAFKA-7380:
---

The above link has a trailing period and doesn't work: 
[https://github.com/apache/kafka/pull/5879]

 

> Global thread restore blocks KafkaStreams#start()
> -
>
> Key: KAFKA-7380
> URL: https://issues.apache.org/jira/browse/KAFKA-7380
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Minor
>
> KafkaStreams#start() is documented to be non-blocking and should return 
> immediately. However, if global stores are used, `start()` blocks until all 
> global stores are restored.
> We should change this and start all threads at the same time while all 
> StreamThread would to go `wait()` state on startup immediately, and only 
> resume after global thread restore finishes and `wakeup()` all StreamThreads. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)