[jira] [Commented] (KAFKA-8106) Reducing the allocation and copying of ByteBuffer when logValidator do validation.

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845497#comment-16845497
 ] 

ASF GitHub Bot commented on KAFKA-8106:
---

guozhangwang commented on pull request #6785: KAFKA-8106: Skipping ByteBuffer 
allocation of key / value / headers in logValidator
URL: https://github.com/apache/kafka/pull/6785
 
 
   This is based on @Flowermin in https://github.com/apache/kafka/pull/6699 
addressing the comments.
   
   1. Add skipKeyValueIterator() used in LogValidator; based on wether 
`isCompressed` and `skipKeyValue` there are four possible iterators now.
   2. Add SkipKeyValueDefaultRecord which extends DefaultRecord.
   
   -
   
   We suggest that reducing the allocation and copying of ByteBuffer when 
logValidator do validation when magic value to use is above 1 and no format 
conversion or value overwriting is required for compressed messages.And 
improved code is as follows.
   1. Adding a class **SimplifiedDefaultRecord** implement class Record which 
define  various attributes of a message. 
   2. Adding Function **simplifiedreadFrom**() at class **DefaultRecord** .This 
function will not read data from DataInput to  ByteBuffer which need newly 
creating .**This will reduce the allocation and copying of ByteBuffer** when 
logValidator do validation .This will reduces GC frequency. We offer a simple 
read function to read data from **DataInput** whithout create ByteBuffer.Of 
course this opertion can not avoid deconmpression to data.
   3. Adding Function **simplifiedIterator**() and 
**simplifiedCompressedIterator**() at class **DefaultRecordBatch**.This two 
functions will return iterator of instance belongs to class 
**SimplifiedDefaultRecord**.
   4. Modify code of function **validateMessagesAndAssignOffsetsCompressed**() 
at class  LogValidator.
   
   **After modifing code wich  reducing the allocation and copying of 
ByteBuffer**, the test performance is greatly improved, and the CPU's stable 
usage is below 60%. The following is a comparison of different code test 
performance under the same conditions.
   **Result of performance testing**
   Main config of Kafka: Single 
Message:1024B;TopicPartitions:200;linger.ms:1000ms,
   **1.Before modified code(Source code):**
   Network inflow rate:600M/s;CPU(%)(97%);production:25,000,000 messages/s
   **2.After modified code(remove allocation of ByteBuffer):**
   Network inflow rate:1G/s;CPU(%)(<60%);production:41,000,000 messages/s
**1.Before modified code(Source code) GC:**
   ![](https://i.loli.net/2019/05/07/5cd16df163ad3.png)
   **2.After modified code(remove allocation of ByteBuffer) GC:**
   ![](https://i.loli.net/2019/05/07/5cd16dae1dbc2.png)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reducing the allocation and copying of ByteBuffer  when logValidator  do 
> validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> 

[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-21 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845478#comment-16845478
 ] 

Omkar Mestry commented on KAFKA-7245:
-

So we need to update only the tests which have put() call with 2 parameters to 
a put() with 3 parameters right ?

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8405) Remove deprecated preferred leader RPC and Command

2019-05-21 Thread Jose Armando Garcia Sancio (JIRA)
Jose Armando Garcia Sancio created KAFKA-8405:
-

 Summary: Remove deprecated preferred leader RPC and Command
 Key: KAFKA-8405
 URL: https://issues.apache.org/jira/browse/KAFKA-8405
 Project: Kafka
  Issue Type: Task
  Components: admin
Affects Versions: 3.0.0
Reporter: Jose Armando Garcia Sancio
 Fix For: 3.0.0


For version 2.4.0, we deprecated:
# AdminClient.electPreferredLeaders
# ElectPreferredLeadersResult
# ElectPreferredLeadersOptions
# PreferredReplicaLeaderElectionCommand.

For version 3.0.0 we should remove all of this symbols and the reference to 
them. For the command that includes:
# bin/kafka-preferred-replica-election.sh
# bin/windows/kafka-preferred-replica-election.bat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-21 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845381#comment-16845381
 ] 

Sophie Blee-Goldman commented on KAFKA-7994:


Hi [~Yohan123], I think we need to be careful in assuming a singular view of 
streamtime across tasks or even within a single task. Rather than it being an 
obstacle that different subtopologies can't "talk" to one another and pass 
along a single stream time, I think this actually enforces correctness – each 
node has its own sense of time and it doesn't make sense for them to look 
upstream for the time as seen by a different node. See 
[https://github.com/apache/kafka/pull/6278#|https://github.com/apache/kafka/pull/6278]

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-21 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845338#comment-16845338
 ] 

Richard Yu commented on KAFKA-7994:
---

[~guozhang] [~mjsax] and [~vvcephei] I created a KIP for this issue so we could 
discuss this more in detail. Below is the link.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-472%3A+%5BSTREAMS%5D+Add+partition+time+field+to+RecordContext]

Hope this could facilitate the process! :)

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8365) Protocol and consumer support for follower fetching

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845326#comment-16845326
 ] 

ASF GitHub Bot commented on KAFKA-8365:
---

hachikuji commented on pull request #6775: Minor: follow up for KAFKA-8365
URL: https://github.com/apache/kafka/pull/6775
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Protocol and consumer support for follower fetching
> ---
>
> Key: KAFKA-8365
> URL: https://issues.apache.org/jira/browse/KAFKA-8365
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.3.0
>
>
> Add the consumer client changes and implement the protocol support for 
> [KIP-392|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2019-05-21 Thread Rens Groothuijsen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-2939:


Assignee: Rens Groothuijsen

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Assignee: Rens Groothuijsen
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7565) NPE in KafkaConsumer

2019-05-21 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7565.
---
   Resolution: Duplicate
Fix Version/s: 2.3.0

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
> Fix For: 2.3.0
>
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8052) Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request

2019-05-21 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8052.
---
   Resolution: Fixed
 Reviewer: Jason Gustafson
Fix Version/s: 2.3.0

> Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request 
> 
>
> Key: KAFKA-8052
> URL: https://issues.apache.org/jira/browse/KAFKA-8052
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Bartek Jakub
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.3.0
>
>
> I noticed in my logs some weird behavior. I see in logs intermittent log: 
> {noformat}
> 2019-03-06 14:02:13.024 INFO 1 --- [container-1-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-4, 
> groupId=service-main] Node 2 was unable to process the fetch request with 
> (sessionId=1321134604, epoch=125730): INVALID_FETCH_SESSION_EPOCH.{noformat}
> which happens every ~1 hour. 
>  
> I was wondering if it's my Kafka provider fault so I decided to investigate 
> the problem and I tried to reproduce the issue on my local - with success. My 
> configuration is:
>  * Kafka Clients version - 2.0.1
>  * Kafka - 2.12_2.1.0
>  
> I enabled trace logs for 'org.apache.kafka.clients' and that's what I get:
> {noformat}
> 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Built incremental fetch (sessionId=197970881, 
> epoch=525) for node 1001. Added (), altered (), removed () out of 
> (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, 
> itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, 
> itunes-command-17, itunes-command-16)
> 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] 
> o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null)
> 2019-03-05 21:04:16.161 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=525,topics=[],forgotten_topics_data=[]}
>  with correlation id 629 to node 1001
> 2019-03-05 21:04:16.664 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main] Completed receive from node 1001 for FETCH with 
> correlation id 629, received 
> {throttle_time_ms=0,error_code=0,session_id=197970881,responses=[]}
> 2019-03-05 21:04:16.664 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Node 1001 sent an incremental fetch response for 
> session 197970881 with response=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)
> 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Built incremental fetch (sessionId=197970881, 
> epoch=526) for node 1001. Added (), altered (), removed () out of 
> (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, 
> itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, 
> itunes-command-17, itunes-command-16)
> 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] 
> o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null)
> 2019-03-05 21:04:16.665 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main - F630] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=526,topics=[],forgotten_topics_data=[]}
>  with correlation id 630 to node 1001
> 2019-03-05 21:04:17.152 DEBUG 3052 --- [ 

[jira] [Commented] (KAFKA-8052) Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845248#comment-16845248
 ] 

ASF GitHub Bot commented on KAFKA-8052:
---

rajinisivaram commented on pull request #6582: KAFKA-8052; Ensure fetch session 
epoch is updated before new request
URL: https://github.com/apache/kafka/pull/6582
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request 
> 
>
> Key: KAFKA-8052
> URL: https://issues.apache.org/jira/browse/KAFKA-8052
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Bartek Jakub
>Assignee: Rajini Sivaram
>Priority: Major
>
> I noticed in my logs some weird behavior. I see in logs intermittent log: 
> {noformat}
> 2019-03-06 14:02:13.024 INFO 1 --- [container-1-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-4, 
> groupId=service-main] Node 2 was unable to process the fetch request with 
> (sessionId=1321134604, epoch=125730): INVALID_FETCH_SESSION_EPOCH.{noformat}
> which happens every ~1 hour. 
>  
> I was wondering if it's my Kafka provider fault so I decided to investigate 
> the problem and I tried to reproduce the issue on my local - with success. My 
> configuration is:
>  * Kafka Clients version - 2.0.1
>  * Kafka - 2.12_2.1.0
>  
> I enabled trace logs for 'org.apache.kafka.clients' and that's what I get:
> {noformat}
> 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Built incremental fetch (sessionId=197970881, 
> epoch=525) for node 1001. Added (), altered (), removed () out of 
> (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, 
> itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, 
> itunes-command-17, itunes-command-16)
> 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] 
> o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null)
> 2019-03-05 21:04:16.161 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=525,topics=[],forgotten_topics_data=[]}
>  with correlation id 629 to node 1001
> 2019-03-05 21:04:16.664 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main] Completed receive from node 1001 for FETCH with 
> correlation id 629, received 
> {throttle_time_ms=0,error_code=0,session_id=197970881,responses=[]}
> 2019-03-05 21:04:16.664 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Node 1001 sent an incremental fetch response for 
> session 197970881 with response=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)
> 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Built incremental fetch (sessionId=197970881, 
> epoch=526) for node 1001. Added (), altered (), removed () out of 
> (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, 
> itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, 
> itunes-command-17, itunes-command-16)
> 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] 
> o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null)
> 

[jira] [Created] (KAFKA-8404) Authorization header is not passed in Connect when forwarding REST requests

2019-05-21 Thread Robert Yokota (JIRA)
Robert Yokota created KAFKA-8404:


 Summary: Authorization header is not passed in Connect when 
forwarding REST requests
 Key: KAFKA-8404
 URL: https://issues.apache.org/jira/browse/KAFKA-8404
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Robert Yokota
 Fix For: 2.3.0


When Connect forwards a REST request from one worker to another, the 
Authorization header is not forwarded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8199) ClassCastException when trying to groupBy after suppress

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845185#comment-16845185
 ] 

ASF GitHub Bot commented on KAFKA-8199:
---

vvcephei commented on pull request #6781: KAFKA-8199: Implement ValueGetter for 
Suppress
URL: https://github.com/apache/kafka/pull/6781
 
 
   See also https://github.com/apache/kafka/pull/6684
   
   KTable processors must be supplied with a KTableProcessorSupplier, which in 
turn requires implementing a ValueGetter, for use with joins and groupings.
   
   For suppression, a correct view only includes the previously emitted values 
(not the currently buffered ones), so this change also involves pushing the 
Change value type into the suppression buffer's interface, so that it can get 
the prior value upon first buffering (which is also the previously emitted 
value).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ClassCastException when trying to groupBy after suppress
> 
>
> Key: KAFKA-8199
> URL: https://issues.apache.org/jira/browse/KAFKA-8199
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Jose Lopez
>Priority: Major
> Fix For: 2.3.0
>
>
> A topology with a groupBy after a suppress operation results in a 
> ClassCastException
>  The following sample topology
> {noformat}
> Properties properties = new Properties(); 
> properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid"); 
> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost");
> StreamsBuilder builder = new StreamsBuilder();
>  builder.stream("topic")
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(30))).count() 
> .suppress(Suppressed.untilTimeLimit(Duration.ofHours(1), 
> BufferConfig.unbounded())) 
> .groupBy((k, v) -> KeyValue.pair(k,v)).count().toStream(); 
> builder.build(properties);
> {noformat}
> results in this exception:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.kstream.internals.KTableImpl$$Lambda$4/2084435065 
> cannot be cast to 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845161#comment-16845161
 ] 

Matthias J. Sax commented on KAFKA-7245:


No problem [~panuwat.anawatmongk...@gmail.com]! Thanks for reassigning to 
[~omanges]!

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-21 Thread Panuwat Anawatmongkhon (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845160#comment-16845160
 ] 

Panuwat Anawatmongkhon commented on KAFKA-7245:
---

[~mjsax] [~omanges] I forgot that I have this ticket on me. Sorry. I have 
reassigned to Omkar Mestry.

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-21 Thread Panuwat Anawatmongkhon (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panuwat Anawatmongkhon reassigned KAFKA-7245:
-

Assignee: Omkar Mestry  (was: Panuwat Anawatmongkhon)

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845126#comment-16845126
 ] 

Matthias J. Sax commented on KAFKA-7245:


Well. [~panuwat.anawatmongk...@gmail.com] wanted to work on this. If he does 
not have interest any longer, we can reassign the ticket of course.

@[~panuwat.anawatmongk...@gmail.com] What is the status? There was no activity 
on this ticket for a long time.

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Panuwat Anawatmongkhon
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC

2019-05-21 Thread Jose Armando Garcia Sancio (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-8286:
--
Description: 
Tracking issue for implementing KIP-460. Tasks:
 # [Done] Design KIP
 # [Done] Review KIP
 # [Done] Approve KIP
 # [Done] Update RPC to support KIP
 # [Done] Update controller to support KIP
 # [Done] Create CLI command (kafka-leader-election) that implement KIP
 # [Done] Search and replace any usage of “preferred” in the code
 # Add test for command
 # [Done] Add test for controller functionality
 # [Done] Revisit all of the documentation - generate and audit the new javadocs
 # [Done] Deprecated since... needs to be update
 # Review PR
 # Merge PR
 # Update the KIP based on the latest implementation

  was:
Tracking issue for implementing KIP-460. Tasks:
 # [Done] Design KIP
 # [Done] Review KIP
 # [Done] Approve KIP
 # [Done] Update RPC to support KIP
 # [Done] Update controller to support KIP
 # [Done] Create CLI command (kafka-leader-election) that implement KIP
 # [Done] Search and replace any usage of “preferred” in the code
 # Add test for command
 # [Done] Add test for controller functionality
 # Revisit all of the documentation - generate and audit the new javadocs
 # Deprecated since... needs to be update
 # Review PR
 # Merge PR
 # Update the KIP based on the latest implementation


> KIP-460 Admin Leader Election RPC
> -
>
> Key: KAFKA-8286
> URL: https://issues.apache.org/jira/browse/KAFKA-8286
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin, clients, core
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for implementing KIP-460. Tasks:
>  # [Done] Design KIP
>  # [Done] Review KIP
>  # [Done] Approve KIP
>  # [Done] Update RPC to support KIP
>  # [Done] Update controller to support KIP
>  # [Done] Create CLI command (kafka-leader-election) that implement KIP
>  # [Done] Search and replace any usage of “preferred” in the code
>  # Add test for command
>  # [Done] Add test for controller functionality
>  # [Done] Revisit all of the documentation - generate and audit the new 
> javadocs
>  # [Done] Deprecated since... needs to be update
>  # Review PR
>  # Merge PR
>  # Update the KIP based on the latest implementation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845088#comment-16845088
 ] 

ASF GitHub Bot commented on KAFKA-8315:
---

guozhangwang commented on pull request #6664: KAFKA-8315: fix the JoinWindows 
retention deprecation doc
URL: https://github.com/apache/kafka/pull/6664
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8195) Unstable Producer After Send Failure in Transaction

2019-05-21 Thread Gary Russell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845029#comment-16845029
 ] 

Gary Russell commented on KAFKA-8195:
-

{quote}
How did you end up in this situation?
{quote}

Once I got the {{transactional.id}} into the state where it timed out in 
{{initTransactions()}} I couldn't get past it without removing the logs.

> Unstable Producer After Send Failure in Transaction
> ---
>
> Key: KAFKA-8195
> URL: https://issues.apache.org/jira/browse/KAFKA-8195
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.2.0, 2.3.0
>Reporter: Gary Russell
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> This journey started with [this Stack Overflow question | 
> https://stackoverflow.com/questions/55510898].
> I easily reproduced his issue (see my comments on his question).
> My first step was to take Spring out of the picture and replicate the issue 
> with the native {{Producer}} apis. The following code shows the result; I 
> have attached logs and stack traces.
> There are four methods in the test; the first performs 2 transactions, 
> successfully, on the same {{Producer}} instance.
> The second aborts 2 transactions, successfully, on the same {{Producer}} 
> instance - application level failures after performing a send.
> There are two flavors of the problem:
> The third attempts to send 2 messages, on the same {{Producer}} that are too 
> large for the topic; the first aborts as expected; the second send hangs in 
> {{abortTransaction}} after getting a {{TimeoutException}} when attempting to 
> {{get}} the send metadata. See log {{hang.same.producer.log}} - it also 
> includes the stack trace showing the hang.
> The fourth test is similar to the third but it closes the producer after the 
> first failure; this time, we timeout in {{initTransactions()}}.
> Subsequent executions of this test get the timeout on the first attempt - 
> that {{transactional.id}} seems to be unusable. Removing the logs was one way 
> I found to get past the problem.
> Test code
> {code:java}
>   public ApplicationRunner runner(AdminClient client, 
> DefaultKafkaProducerFactory pf) {
>   return args -> {
>   try {
>   Map configs = new 
> HashMap<>(pf.getConfigurationProperties());
>   int committed = testGoodTx(client, configs);
>   System.out.println("Successes (same producer) 
> committed: " + committed);
>   int rolledBack = testAppFailureTx(client, 
> configs);
>   System.out.println("App failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // first flavor - hung thread in 
> abortTransaction()
> //rolledBack = 
> testSendFailureTxSameProducer(client, configs);
> //System.out.println("Send failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // second flavor - timeout in initTransactions()
>   rolledBack = 
> testSendFailureTxNewProducer(client, configs);
>   System.out.println("Send failures (new 
> producer) rolled back: " + rolledBack);
>   }
>   catch (Exception e) {
>   e.printStackTrace();
>   }
>   };
>   }
>   private int testGoodTx(AdminClient client, Map configs)
>   throws ExecutionException {
>   int commits = 0;
>   NewTopic topic = TopicBuilder.name("so55510898a")
>   .partitions(1)
>   .replicas(1)
>   .build();
>   createTopic(client, topic);
>   configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-");
>   Producer producer = new 
> KafkaProducer<>(configs);
>   try {
>   producer.initTransactions();
>   for (int i = 0; i < 2; i++) {
>   producer.beginTransaction();
>   RecordMetadata recordMetadata = producer.send(
>   new 
> ProducerRecord<>("so55510898a", "foo")).get(10, 
> TimeUnit.SECONDS);
>   System.out.println(recordMetadata);
>   producer.commitTransaction();
>   commits++;
>   }
>   }
>   catch (ProducerFencedException | 

[jira] [Commented] (KAFKA-8265) Connect Client Config Override policy

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845014#comment-16845014
 ] 

ASF GitHub Bot commented on KAFKA-8265:
---

rhauch commented on pull request #6776: KAFKA-8265 : Match client config 
override prefix to match with the KIP
URL: https://github.com/apache/kafka/pull/6776
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect Client Config Override policy
> -
>
> Key: KAFKA-8265
> URL: https://issues.apache.org/jira/browse/KAFKA-8265
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Major
>
> Right now, each source connector and sink connector inherit their client 
> configurations from the worker properties. Within the worker properties, all 
> configurations that have a prefix of "producer." or "consumer." are applied 
> to all source connectors and sink connectors respectively.
> We should allow the  "producer." or "consumer." to be overridden in 
> accordance to an override policy determined by the administrator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-21 Thread Pavel Savov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844958#comment-16844958
 ] 

Pavel Savov commented on KAFKA-8367:


[~guozhang] Yes, the commit you mentioned was included in the build.

[~ableegoldman] I tried with Kafka 2.1.0 and the leak is gone so it seems to 
have been introduced in 2.2. Our 2.0.1 app was the same as the 2.2 one. 

 

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8403) Suppress needs a Materialized variant

2019-05-21 Thread John Roesler (JIRA)
John Roesler created KAFKA-8403:
---

 Summary: Suppress needs a Materialized variant
 Key: KAFKA-8403
 URL: https://issues.apache.org/jira/browse/KAFKA-8403
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


The newly added KTable Suppress operator lacks a Materialized variant, which 
would be useful if you wanted to query the results of the suppression.

Suppression results will eventually match the upstream results, but the 
intermediate distinction may be meaningful for some applications. For example, 
you could want to query only the final results of a windowed aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8152) Offline partition state not propagated by controller

2019-05-21 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-8152.
--
Resolution: Duplicate

> Offline partition state not propagated by controller
> 
>
> Key: KAFKA-8152
> URL: https://issues.apache.org/jira/browse/KAFKA-8152
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Currently when the controller starts up, only the state of online partitions 
> will be sent to other brokers. Any broker which is started or restarted after 
> the controller will see only a subset of the partitions of any topic which 
> has offline partitions. If all the partitions for a topic are offline, then 
> the broker will not know of the topic at all. As far as I can tell, the bug 
> is the fact that `ReplicaStateMachine.startup` only does an initial state 
> change for replicas which are online.
> This can be reproduced with the following steps:
>  # Startup two brokers
>  # Create a single partition topic with rf=1
>  # Shutdown the broker where the replica landed
>  # Shutdown the other broker
>  # Restart the broker without the replica
>  # Run `kafka-topics --describe --bootstrap-server \{server ip}`
> Note that the metadata inconsistency will only be apparent when using 
> `bootstrap-server` in `kafka-topics.sh`. Using zookeeper, everything will 
> seem normal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3143) inconsistent state in ZK when all replicas are dead

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844948#comment-16844948
 ] 

ASF GitHub Bot commented on KAFKA-3143:
---

omkreddy commented on pull request #5041: KAFKA-3143: Controller should 
transition offline replicas on startup 
URL: https://github.com/apache/kafka/pull/5041
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> inconsistent state in ZK when all replicas are dead
> ---
>
> Key: KAFKA-3143
> URL: https://issues.apache.org/jira/browse/KAFKA-3143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Ismael Juma
>Priority: Major
>  Labels: reliability
> Fix For: 2.3.0
>
>
> This issue can be recreated in the following steps.
> 1. Start 3 brokers, 1, 2 and 3.
> 2. Create a topic with a single partition and 2 replicas, say on broker 1 and 
> 2.
> If we stop both replicas 1 and 2, depending on where the controller is, the 
> leader and isr stored in ZK in the end are different.
> If the controller is on broker 3, what's stored in ZK will be -1 for leader 
> and an empty set for ISR.
> On the other hand, if the controller is on broker 2 and we stop broker 1 
> followed by broker 2, what's stored in ZK will be 2 for leader and 2 for ISR.
> The issue is that in the first case, the controller will call 
> ReplicaStateMachine to transition to OfflineReplica, which will change the 
> leader and isr. However, in the second case, the controller fails over, but 
> we don't transition ReplicaStateMachine to OfflineReplica during controller 
> initialization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3143) inconsistent state in ZK when all replicas are dead

2019-05-21 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3143.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 5041
[https://github.com/apache/kafka/pull/5041]

> inconsistent state in ZK when all replicas are dead
> ---
>
> Key: KAFKA-3143
> URL: https://issues.apache.org/jira/browse/KAFKA-3143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Ismael Juma
>Priority: Major
>  Labels: reliability
> Fix For: 2.3.0
>
>
> This issue can be recreated in the following steps.
> 1. Start 3 brokers, 1, 2 and 3.
> 2. Create a topic with a single partition and 2 replicas, say on broker 1 and 
> 2.
> If we stop both replicas 1 and 2, depending on where the controller is, the 
> leader and isr stored in ZK in the end are different.
> If the controller is on broker 3, what's stored in ZK will be -1 for leader 
> and an empty set for ISR.
> On the other hand, if the controller is on broker 2 and we stop broker 1 
> followed by broker 2, what's stored in ZK will be 2 for leader and 2 for ISR.
> The issue is that in the first case, the controller will call 
> ReplicaStateMachine to transition to OfflineReplica, which will change the 
> leader and isr. However, in the second case, the controller fails over, but 
> we don't transition ReplicaStateMachine to OfflineReplica during controller 
> initialization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8402) bin/kafka-preferred-replica-election.sh fails if generated json is bigger than 1MB

2019-05-21 Thread Vyacheslav Stepanov (JIRA)
Vyacheslav Stepanov created KAFKA-8402:
--

 Summary: bin/kafka-preferred-replica-election.sh fails if 
generated json is bigger than 1MB
 Key: KAFKA-8402
 URL: https://issues.apache.org/jira/browse/KAFKA-8402
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 1.1.1
Reporter: Vyacheslav Stepanov


If I run script {{bin/kafka-preferred-replica-election.sh}} without specifying 
the list of topics/partitions - it will get all topics/partitions from 
zookeeper and transform that to json, then it will create zookeeper node at 
{{/admin/preferred_replica_election}} using this json as data for that 
zookeeper node. If the generated json is bigger than 1MB (default max size of 
data of zookeeper node) - the script will fail without giving a good 
description of failure. The size of 1MB can be reached if the amount of 
topics/partitions is high enough.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8320) Connect Error handling is using the RetriableException from common package

2019-05-21 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8320.
--
   Resolution: Fixed
Fix Version/s: 2.2.1
   2.1.2
   2.3.0
   2.0.2

Merged and backported thru the `2.0` branch.

> Connect Error handling is using the RetriableException from common package
> --
>
> Key: KAFKA-8320
> URL: https://issues.apache.org/jira/browse/KAFKA-8320
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Major
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> When a SourceConnector throws 
> org.apache.kafka.connect.errors.RetriableException during the poll, connect 
> runtime is supposed to ignore the error and retry per 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
>  . When the conenctors throw the execption its not handled gracefully. 
> WorkerSourceTask is catching the exception from wrong package 
> `org.apache.kafka.common.errors`.  It is not clear from the API standpoint as 
> to which package the connect framework supports. The safest thing would be to 
> support both the packages even though it's less desirable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3096) Leader is not set to -1 when it is shutdown if followers are down

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844876#comment-16844876
 ] 

ASF GitHub Bot commented on KAFKA-3096:
---

ijuma commented on pull request #765: KAFKA-3096; Leader is not set to -1 when 
it is shutdown if followers are down
URL: https://github.com/apache/kafka/pull/765
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leader is not set to -1 when it is shutdown if followers are down
> -
>
> Key: KAFKA-3096
> URL: https://issues.apache.org/jira/browse/KAFKA-3096
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>  Labels: reliability
>
> Assuming a cluster with 2 brokers with unclear leader election disabled:
> 1. Start brokers 0 and 1
> 2. Perform partition assignment
> 3. Broker 0 is elected leader
> 4. Produce message and wait until metadata is propagated
> 6. Shutdown follower
> 7. Produce message
> 8. Shutdown leader
> 9. Start follower
> 10. Wait for leader election
> Expected: leader is -1
> Actual: leader is 0
> We have a test for this, but a bug in `waitUntilLeaderIsElectedOrChanged` 
> means that `newLeaderOpt` is not being checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844818#comment-16844818
 ] 

ASF GitHub Bot commented on KAFKA-3932:
---

chienhsingwu commented on pull request #5838: KAFKA-3932 - Consumer fails to 
consume in a round robin fashion
URL: https://github.com/apache/kafka/pull/5838
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Assignee: CHIENHSING WU
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-21 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844764#comment-16844764
 ] 

Omkar Mestry commented on KAFKA-7245:
-

[~mjsax] Please add me to the contributors list, so that I can assign myself 
the tasks.

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Panuwat Anawatmongkhon
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8401) consumer.poll(Duration.ofMillis(100)) blocking

2019-05-21 Thread leishuiyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leishuiyu updated KAFKA-8401:
-
Description: 
# this is code
{code:java}
public class Consumer extends Thread {

KafkaConsumer consumer;

public Consumer() {
Properties props = new Properties();
//47.105.201.137 is public network Ip
props.put("bootstrap.servers", "47.105.201.137:9092");  //连接地址
props.put("group.id", "lsy_test");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
}


@Override
public void run() {
consumer.subscribe(Arrays.asList("flink_order"));
while (true) {
ConsumerRecords poll = 
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : poll) {
System.out.println(record.key() + "---" + record.value());
}
}
}

public static void main(String[] args) {
Consumer sumer = new Consumer();
sumer.start();
}
}

{code}

 #  Configured hosts for remote machines
{code:java}
xx.xx.xx.xx centos-7{code}

 # when my code running in local machines,the 
bootstrap.servers=47.105.201.137:9092 the consumer poll is blocking ,howerver 
in my mac set /etc/hosts 47.105.201.137 centos-7 and 
boostrap.servers=centos-7:9092 the consumer can poll message,The previous 
methods consumer.listTopics() is successful,only poll message is blocking ,I 
feel very confused

  was:
# this is code
{code:java}
//public class Consumer extends Thread {

KafkaConsumer consumer;

public Consumer() {
Properties props = new Properties();
//47.105.201.137 is public network Ip
props.put("bootstrap.servers", "47.105.201.137:9092");  //连接地址
props.put("group.id", "lsy_test");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
}


@Override
public void run() {
consumer.subscribe(Arrays.asList("flink_order"));
while (true) {
ConsumerRecords poll = 
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : poll) {
System.out.println(record.key() + "---" + record.value());
}
}
}

public static void main(String[] args) {
Consumer sumer = new Consumer();
sumer.start();
}
}

{code}

 #  Configured hosts for remote machines
{code:java}
//xx.xx.xx.xx centos-7{code}

 # when my code running in local machines,the 
bootstrap.servers=47.105.201.137:9092 the consumer poll is blocking ,howerver 
in my mac set /etc/hosts 47.105.201.137 centos-7 and 
boostrap.servers=centos-7:9092 the consumer can poll message,The previous 
methods consumer.listTopics() is successful,only poll message is blocking ,I 
feel very confused


> consumer.poll(Duration.ofMillis(100)) blocking 
> ---
>
> Key: KAFKA-8401
> URL: https://issues.apache.org/jira/browse/KAFKA-8401
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: kafka 1.1.0
> zk   3.4.11
>Reporter: leishuiyu
>Priority: Major
>  Labels: blocking, kafka
>
> # this is code
> {code:java}
> public class Consumer extends Thread {
> KafkaConsumer consumer;
> public Consumer() {
> Properties props = new Properties();
> //47.105.201.137 is public network Ip
> props.put("bootstrap.servers", "47.105.201.137:9092");  //连接地址
> props.put("group.id", "lsy_test");
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.IntegerDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> this.consumer = new KafkaConsumer(props);
> }
> @Override
> public void run() {
> consumer.subscribe(Arrays.asList("flink_order"));
> while (true) {
> ConsumerRecords poll = 
> 

[jira] [Created] (KAFKA-8401) consumer.poll(Duration.ofMillis(100)) blocking

2019-05-21 Thread leishuiyu (JIRA)
leishuiyu created KAFKA-8401:


 Summary: consumer.poll(Duration.ofMillis(100)) blocking 
 Key: KAFKA-8401
 URL: https://issues.apache.org/jira/browse/KAFKA-8401
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.1.0
 Environment: kafka 1.1.0
zk   3.4.11
Reporter: leishuiyu


# this is code
{code:java}
//public class Consumer extends Thread {

KafkaConsumer consumer;

public Consumer() {
Properties props = new Properties();
//47.105.201.137 is public network Ip
props.put("bootstrap.servers", "47.105.201.137:9092");  //连接地址
props.put("group.id", "lsy_test");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
}


@Override
public void run() {
consumer.subscribe(Arrays.asList("flink_order"));
while (true) {
ConsumerRecords poll = 
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : poll) {
System.out.println(record.key() + "---" + record.value());
}
}
}

public static void main(String[] args) {
Consumer sumer = new Consumer();
sumer.start();
}
}

{code}

 #  Configured hosts for remote machines
{code:java}
//xx.xx.xx.xx centos-7{code}

 # when my code running in local machines,the 
bootstrap.servers=47.105.201.137:9092 the consumer poll is blocking ,howerver 
in my mac set /etc/hosts 47.105.201.137 centos-7 and 
boostrap.servers=centos-7:9092 the consumer can poll message,The previous 
methods consumer.listTopics() is successful,only poll message is blocking ,I 
feel very confused



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7991) Add StreamsUpgradeTest for 2.2 release

2019-05-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844670#comment-16844670
 ] 

Matthias J. Sax commented on KAFKA-7991:


[~vvcephei] Seems this overlaps with 
https://issues.apache.org/jira/browse/KAFKA-8155 ? Should we close this ticket 
as duplicate? There is already a PR for 8155.

> Add StreamsUpgradeTest for 2.2 release
> --
>
> Key: KAFKA-7991
> URL: https://issues.apache.org/jira/browse/KAFKA-7991
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7983) supporting replication.throttled.replicas in dynamic broker configuration

2019-05-21 Thread Rahul Agarwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844660#comment-16844660
 ] 

Rahul Agarwal commented on KAFKA-7983:
--

I am working on it. Will submit the patch once ready.

> supporting replication.throttled.replicas in dynamic broker configuration
> -
>
> Key: KAFKA-7983
> URL: https://issues.apache.org/jira/browse/KAFKA-7983
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Jun Rao
>Assignee: Rahul Agarwal
>Priority: Major
>
> In 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration#KIP-226-DynamicBrokerConfiguration-DefaultTopicconfigs],
>  we added the support to change broker defaults dynamically. However, it 
> didn't support changing leader.replication.throttled.replicas and 
> follower.replication.throttled.replicas. These 2 configs were introduced in 
> [KIP-73|https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas]
>  and controls the set of topic partitions on which replication throttling 
> will be engaged. One useful case is to be able to set a default value for 
> both configs to * to allow throttling to be engaged for all topic partitions. 
> Currently, the static default value for both configs are ignored for 
> replication throttling, it would be useful to fix that as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8195) Unstable Producer After Send Failure in Transaction

2019-05-21 Thread Viktor Somogyi-Vass (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844622#comment-16844622
 ] 

Viktor Somogyi-Vass commented on KAFKA-8195:


Yea, min.insync.replicas is a similar situation.

{quote}However, it does seem punitive in that I had to remove the logs 
first.{quote}

How did you end up in this situation?

> Unstable Producer After Send Failure in Transaction
> ---
>
> Key: KAFKA-8195
> URL: https://issues.apache.org/jira/browse/KAFKA-8195
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.2.0, 2.3.0
>Reporter: Gary Russell
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> This journey started with [this Stack Overflow question | 
> https://stackoverflow.com/questions/55510898].
> I easily reproduced his issue (see my comments on his question).
> My first step was to take Spring out of the picture and replicate the issue 
> with the native {{Producer}} apis. The following code shows the result; I 
> have attached logs and stack traces.
> There are four methods in the test; the first performs 2 transactions, 
> successfully, on the same {{Producer}} instance.
> The second aborts 2 transactions, successfully, on the same {{Producer}} 
> instance - application level failures after performing a send.
> There are two flavors of the problem:
> The third attempts to send 2 messages, on the same {{Producer}} that are too 
> large for the topic; the first aborts as expected; the second send hangs in 
> {{abortTransaction}} after getting a {{TimeoutException}} when attempting to 
> {{get}} the send metadata. See log {{hang.same.producer.log}} - it also 
> includes the stack trace showing the hang.
> The fourth test is similar to the third but it closes the producer after the 
> first failure; this time, we timeout in {{initTransactions()}}.
> Subsequent executions of this test get the timeout on the first attempt - 
> that {{transactional.id}} seems to be unusable. Removing the logs was one way 
> I found to get past the problem.
> Test code
> {code:java}
>   public ApplicationRunner runner(AdminClient client, 
> DefaultKafkaProducerFactory pf) {
>   return args -> {
>   try {
>   Map configs = new 
> HashMap<>(pf.getConfigurationProperties());
>   int committed = testGoodTx(client, configs);
>   System.out.println("Successes (same producer) 
> committed: " + committed);
>   int rolledBack = testAppFailureTx(client, 
> configs);
>   System.out.println("App failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // first flavor - hung thread in 
> abortTransaction()
> //rolledBack = 
> testSendFailureTxSameProducer(client, configs);
> //System.out.println("Send failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // second flavor - timeout in initTransactions()
>   rolledBack = 
> testSendFailureTxNewProducer(client, configs);
>   System.out.println("Send failures (new 
> producer) rolled back: " + rolledBack);
>   }
>   catch (Exception e) {
>   e.printStackTrace();
>   }
>   };
>   }
>   private int testGoodTx(AdminClient client, Map configs)
>   throws ExecutionException {
>   int commits = 0;
>   NewTopic topic = TopicBuilder.name("so55510898a")
>   .partitions(1)
>   .replicas(1)
>   .build();
>   createTopic(client, topic);
>   configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-");
>   Producer producer = new 
> KafkaProducer<>(configs);
>   try {
>   producer.initTransactions();
>   for (int i = 0; i < 2; i++) {
>   producer.beginTransaction();
>   RecordMetadata recordMetadata = producer.send(
>   new 
> ProducerRecord<>("so55510898a", "foo")).get(10, 
> TimeUnit.SECONDS);
>   System.out.println(recordMetadata);
>   producer.commitTransaction();
>   commits++;
>   }
>   }
>   catch (ProducerFencedException |