[jira] [Commented] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes

2020-01-03 Thread Omkar Mestry (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007928#comment-17007928
 ] 

Omkar Mestry commented on KAFKA-9259:
-

[~vvcephei] I went through code and found that in KStreamWindowAggregate, here 
in the process function the following code basically wraps the key with 
Windowed type :- 

tupleForwarder.maybeForward(
 new Windowed<>(key, entry.getValue()),
 newAgg,
 sendOldValues ? oldAgg : null,
 newTimestamp);

> suppress() for windowed-Serdes does not work with default serdes
> 
>
> Key: KAFKA-9259
> URL: https://issues.apache.org/jira/browse/KAFKA-9259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Major
>  Labels: newbie
>
> The suppress() operator either inherits serdes from its upstream operator or 
> falls back to default serdes from the config.
> If the upstream operator is an windowed aggregation, the window-aggregation 
> operator wraps the user passed-in serde with a window-serde and pushed it 
> into suppress() – however, if default serdes are used, the window-aggregation 
> operator cannot push anything into suppress(). At runtime, it just creates a 
> default serde and wraps it according. For this case, suppress() also falls 
> back to default serdes; however, it does not wrap the serde and thus a 
> ClassCastException is thrown when the serde is used later.
> suppress() is already aware if the upstream aggregation is time/session 
> windowed or not and thus should use this information to wrap default serdes 
> accordingly.
> The current workaround for windowed-suppress is to overwrite the default 
> serde upstream to suppress(), such that suppress() inherits serdes and does 
> not fall back to default serdes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7737) Consolidate InitProducerId API

2020-01-03 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007878#comment-17007878
 ] 

Jason Gustafson commented on KAFKA-7737:


[~viktorsomogyi] Do you mind if I pick this up? I have a WIP PR here: 
https://github.com/hachikuji/kafka/commit/32e276382720333bc4348d20bb9819f64f393553.

> Consolidate InitProducerId API
> --
>
> Key: KAFKA-7737
> URL: https://issues.apache.org/jira/browse/KAFKA-7737
> Project: Kafka
>  Issue Type: Task
>  Components: producer 
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Minor
>  Labels: exactly-once
>
> We have two separate paths in the producer for the InitProducerId API: one 
> for the transactional producer and one for the idempotent producer. It would 
> be nice to find a way to consolidate these.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9330) Calling AdminClient.close in the AdminClient completion callback causes deadlock

2020-01-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007863#comment-17007863
 ] 

ASF GitHub Bot commented on KAFKA-9330:
---

soondenana commented on pull request #7866: KAFKA-9330: Skip `join` when 
`AdminClient.close` is called in callback thread
URL: https://github.com/apache/kafka/pull/7866
 
 
   The close method calls `Thread.join` to wait for AdminClient thread to
   die, but if the close is called in the api completion callback, `join`
   waits forever, as the thread calling `join` is same as the thread it
   wants to wait to die.
   
   This change checks for this condition and skips the join. The thread
   will then return to main loop, where it will check for this condition
   and exit.
   
   Added a new unit test to invoke this condition. The test fails with
   timeout if the thread is joined in callback, passes otherwise.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Calling AdminClient.close in the AdminClient completion callback causes 
> deadlock
> 
>
> Key: KAFKA-9330
> URL: https://issues.apache.org/jira/browse/KAFKA-9330
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> The close method calls `Thread.join` to wait for AdminClient thread to die, 
> but that doesn't happen as the thread calling join is the AdminClient thread. 
> This causes the thread to block forever, causing a deadlock where it forever 
> waits for itself to die. 
> `AdminClient.close` should check if the thread calling close is same as 
> current thread, then skip the join. The thread will check for close condition 
> in the main loop and exit.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9330) Calling AdminClient.close in the AdminClient completion callback causes deadlock

2020-01-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007862#comment-17007862
 ] 

ASF GitHub Bot commented on KAFKA-9330:
---

hachikuji commented on pull request #7866: KAFKA-9330: Skip `join` when 
`AdminClient.close` is called in callback thread
URL: https://github.com/apache/kafka/pull/7866
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Calling AdminClient.close in the AdminClient completion callback causes 
> deadlock
> 
>
> Key: KAFKA-9330
> URL: https://issues.apache.org/jira/browse/KAFKA-9330
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> The close method calls `Thread.join` to wait for AdminClient thread to die, 
> but that doesn't happen as the thread calling join is the AdminClient thread. 
> This causes the thread to block forever, causing a deadlock where it forever 
> waits for itself to die. 
> `AdminClient.close` should check if the thread calling close is same as 
> current thread, then skip the join. The thread will check for close condition 
> in the main loop and exit.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-01-03 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007860#comment-17007860
 ] 

Richard Yu commented on KAFKA-8770:
---

[~vvcephei] It seems that loading all the prior results would be wasteful. From 
what I could tell, it seems that we can pursue a caching strategy with this one 
i.e. use {{CachingKeyValueStore}} or the like if we want to implement this 
feature. 

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9335) java.lang.IllegalArgumentException: Number of partitions must be at least 1.

2020-01-03 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007853#comment-17007853
 ] 

Boyang Chen commented on KAFKA-9335:


By applying your example, I could successfully reproduce the symptom. In fact, 
this is not upgrade related but a 2.4 specific correctness bug. 

The topology of the example used a chaining of 3 sub-topologies, which has a 
dependency A -> B -> C all connected by repartition topics.

We gather the repartition topic metadata first, so that we created a mapping as 
[A-output-rep, B-output-rep], and they all have no partitions defined yet.

Next, a lookup of C's input partition (= B-output-rep) will be informed that 
the lookup fails, so it will try to look for the B's input partition 
(=A-output-rep), here

as A-output is also a repartition topic which has no partition initialized 
either, the passed-down assignment is invalid as we still use a numPartitions = 
0 after the if block.


*To summarize, we need a chaining of 3 sub-topologies where both inputs for 
non-source topologies are repartition topics, and the sink topology gets 
initialized first.*

The explanation is bit tricky, but thanks so much for the reporting, and we 
will propose a fix shortly.

> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> 
>
> Key: KAFKA-9335
> URL: https://issues.apache.org/jira/browse/KAFKA-9335
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Nitay Kufert
>Priority: Major
>  Labels: bug
>
> Hey,
> When trying to upgrade our Kafka streams client to 2.4.0 (from 2.3.1) we 
> encountered the following exception: 
> {code:java}
> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> {code}
> It's important to notice that the exact same code works just fine at 2.3.1.
>  
> I have created a "toy" example which reproduces this exception:
> [https://gist.github.com/nitayk/50da33b7bcce19ad0a7f8244d309cb8f]
> and I would love to get some insight regarding why its happening / ways to 
> get around it
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9293) NPE in DumpLogSegments with --offsets-decoder

2020-01-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9293.

Fix Version/s: 2.4.1
   Resolution: Fixed

> NPE in DumpLogSegments with --offsets-decoder
> -
>
> Key: KAFKA-9293
> URL: https://issues.apache.org/jira/browse/KAFKA-9293
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.1
>
>
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at org.apache.kafka.common.utils.Utils.toArray(Utils.java:230)
> at 
> kafka.tools.DumpLogSegments$OffsetsMessageParser.$anonfun$parseGroupMetadata$2(DumpLogSegments.scala:287)
> at 
> kafka.tools.DumpLogSegments$OffsetsMessageParser.parseGroupMetadata(DumpLogSegments.scala:284)
> at 
> kafka.tools.DumpLogSegments$OffsetsMessageParser.parse(DumpLogSegments.scala:317)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$2(DumpLogSegments.scala:372)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$2$adapted(DumpLogSegments.scala:343)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$1(DumpLogSegments.scala:343)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$1$adapted(DumpLogSegments.scala:340)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at kafka.tools.DumpLogSegments$.dumpLog(DumpLogSegments.scala:340)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$main$1(DumpLogSegments.scala:60)
> at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:51)
> at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}
> The problem is that "userData" is nullable, but the dump log tool doesn't 
> check for null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9293) NPE in DumpLogSegments with --offsets-decoder

2020-01-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007762#comment-17007762
 ] 

ASF GitHub Bot commented on KAFKA-9293:
---

hachikuji commented on pull request #7820: KAFKA-9293; Fix NPE in 
DumpLogSegments offsets parser and display tombstone keys
URL: https://github.com/apache/kafka/pull/7820
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE in DumpLogSegments with --offsets-decoder
> -
>
> Key: KAFKA-9293
> URL: https://issues.apache.org/jira/browse/KAFKA-9293
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at org.apache.kafka.common.utils.Utils.toArray(Utils.java:230)
> at 
> kafka.tools.DumpLogSegments$OffsetsMessageParser.$anonfun$parseGroupMetadata$2(DumpLogSegments.scala:287)
> at 
> kafka.tools.DumpLogSegments$OffsetsMessageParser.parseGroupMetadata(DumpLogSegments.scala:284)
> at 
> kafka.tools.DumpLogSegments$OffsetsMessageParser.parse(DumpLogSegments.scala:317)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$2(DumpLogSegments.scala:372)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$2$adapted(DumpLogSegments.scala:343)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$1(DumpLogSegments.scala:343)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$dumpLog$1$adapted(DumpLogSegments.scala:340)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at kafka.tools.DumpLogSegments$.dumpLog(DumpLogSegments.scala:340)
> at 
> kafka.tools.DumpLogSegments$.$anonfun$main$1(DumpLogSegments.scala:60)
> at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:51)
> at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}
> The problem is that "userData" is nullable, but the dump log tool doesn't 
> check for null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9210) kafka stream loss data

2020-01-03 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007756#comment-17007756
 ] 

Sophie Blee-Goldman commented on KAFKA-9210:


Great! I think we can close this ticket as a duplicate then? 

> kafka stream loss data
> --
>
> Key: KAFKA-9210
> URL: https://issues.apache.org/jira/browse/KAFKA-9210
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: panpan.liu
>Priority: Major
> Attachments: app.log, screenshot-1.png
>
>
> kafka broker: 2.0.1
> kafka stream client: 2.1.0
>  # two applications run at the same time
>  # after some days,I stop one application(in k8s)
>  # The flollowing log occured and I check the data and find that value is 
> less than what I expected.
> {quote}Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1]Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog 
> flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 
> 05:50:49.816|WARN 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread
>  [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting 
> StreamTasks stores to recreate from 
> scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
> out of range with no configured reset policy for partitions: 
> \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19
>  05:50:49.817|INFO 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread
>  [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 
> ProcessorTopology: KSTREAM-SOURCE-70: topics: 
> [flash-app-xmc-worker-share-store-minute-repartition] children: 
> [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: 
> [worker-share-store-minute] children: [KTABLE-TOSTREAM-71] 
> KTABLE-TOSTREAM-71: children: [KSTREAM-SINK-72] 
> KSTREAM-SINK-72: topic: 
> StaticTopicNameExtractor(xmc-worker-share-minute)Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog 
> flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 
> 05:50:49.842|WARN 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread
>  [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting 
> StreamTasks stores to recreate from 
> scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
> out of range with no configured reset policy for partitions: 
> \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19
>  05:50:49.842|INFO 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread
>  [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 
> ProcessorTopology: KSTREAM-SOURCE-70: topics: 
> [flash-app-xmc-worker-share-store-minute-repartition] children: 
> [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: 
> [worker-share-store-minute] children: [KTABLE-TOSTREAM-71] 
> KTABLE-TOSTREAM-71: children: [KSTREAM-SINK-72] 
> KSTREAM-SINK-72: topic: 
> StaticTopicNameExtractor(xmc-worker-share-minute)Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog 
> flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 
> 05:50:49.905|WARN 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread
>  [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting 
> StreamTasks stores to recreate from 
> scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
> out of range with no configured reset policy for partitions: 
> \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19
>  05:50:49.906|INFO 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread
>  [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 
> ProcessorTopology: KSTREAM-SOURCE-70: topics: 
> [flash-app-xmc-worker-share-store-minute-repartition] children: 
> [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: 
> [worker-share-store-minute] 
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9364) Fix misleading consumer logs on throttling

2020-01-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007717#comment-17007717
 ] 

ASF GitHub Bot commented on KAFKA-9364:
---

cmccabe commented on pull request #7894: KAFKA-9364: Fix misleading consumer 
logs on throttling
URL: https://github.com/apache/kafka/pull/7894
 
 
   When the consumer's fetch request is throttled by the KIP-219 mechanism,
   it receives an empty fetch response.  The consumer should not log this
   as an error.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix misleading consumer logs on throttling
> --
>
> Key: KAFKA-9364
> URL: https://issues.apache.org/jira/browse/KAFKA-9364
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Minor
>
> Fix misleading consumer logs on throttling



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9364) Fix misleading consumer logs on throttling

2020-01-03 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-9364:
---

 Summary: Fix misleading consumer logs on throttling
 Key: KAFKA-9364
 URL: https://issues.apache.org/jira/browse/KAFKA-9364
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe


Fix misleading consumer logs on throttling



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2020-01-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007697#comment-17007697
 ] 

Matthias J. Sax commented on KAFKA-7994:


Feel free to update the ticket – but we should create new tickets instead – 
this feature is shipped in 2.4 and for tracking purpose we should not reopen.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9352) unbalanced assignment of topic-partition to tasks

2020-01-03 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9352:
--
Issue Type: New Feature  (was: Improvement)

> unbalanced assignment of topic-partition to tasks
> -
>
> Key: KAFKA-9352
> URL: https://issues.apache.org/jira/browse/KAFKA-9352
> Project: Kafka
>  Issue Type: New Feature
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot 
> 2019-12-19 at 8.22.17 AM.png
>
>
> originally, when mirrormaker replicates a group of topics, the assignment 
> between topic-partition and tasks are pretty static. E.g. partitions from the 
> same topic tend to be grouped together as much as possible on the same task. 
> For example, 3 tasks to mirror 3 topics with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> The original assignment will look like:
> t1 -> [t0p0, t0p1, t0p2, t0p3]
> t2 -> [t0p4, t0p5, t0p6, t0p7]
> t3 -> [t1p0, t1p2, t2p0, t2p1]
> The potential issue of above assignment is: if topic 0 has more traffic than 
> other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than 
> t3. When the tasks are mapped to the mirrormaker instances (workers) and 
> launched, it will create unbalanced load on the workers. Please see the 
> picture below as an unbalanced example of 2 mirrormaker instances:
> !Screen Shot 2019-12-19 at 12.16.02 PM.png!
> Given each mirrored topic has different traffic and number of partitions, to 
> balance the load
> across all mirrormaker instances (workers), 'roundrobin' helps to evenly 
> assign all
> topic-partition to the tasks, then the tasks are further distributed to 
> workers by calling
> 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics 
> with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> t1 -> [t0p0, t0p3, t0p6, t1p1]
> t2 -> [t0p1, t0p4, t0p7, t2p0]
> t3 -> [t0p2, t0p5, t1p0, t2p1]
> The improvement of this new above assignment over the original assignment is: 
> the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, 
> which creates a relatively even load on all workers, after the tasks are 
> mapped to the workers and launched.
> Please see the picture below as a balanced example of 4 mirrormaker instances:
> !Screen Shot 2019-12-19 at 8.22.17 AM.png!
> PR link is: https://github.com/apache/kafka/pull/7880
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007666#comment-17007666
 ] 

Guozhang Wang commented on KAFKA-9312:
--

I think it is indeed an issue that we overlooked. As mentioned in KAFKA-9301, 

{code}
When ProducerBatch split happens, ProducerBatch.produceFuture becomes 
completed, even though records in a batch will be resent to a broker.
{code}

We should not complete the future when split happens, instead we should only 
complete them when we either get a non-retriable error (or retry exhausted / 
timed out) or get a succeed response.

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007664#comment-17007664
 ] 

Jonathan Santilli commented on KAFKA-9312:
--

Checking a little bit further seems like yes, it finishes but if the batch was 
splitted, the _future_ gets chained:

 
{code:java}
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await(); // If Finish here
if (nextRecordMetadata != null)
return nextRecordMetadata.get();
return valueOrError();
}

...

/**
 * This method is used when we have to split a large batch in smaller ones. A 
chained metadata will allow the
 * future that has already returned to the users to wait on the newly created 
split batches even after the
 * old big batch has been deemed as done.
 */
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}

And ProducerBatch#tryAppendForSplit calls thunk.future.chain(future);{code}
So, I think is ok, I will create a test case to verify it.

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 5:39 PM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send() -> FutureRecordMetadata#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send() -> ProduceRequestResult#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9340) Potential race condition in AbstractConfig

2020-01-03 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007646#comment-17007646
 ] 

M. Manna commented on KAFKA-9340:
-

Perhaps someone else can confirm this too? 

I am not sure why synchronization is required here. Config provider usage is 
independent, or at least, it should be. If it's independent, shouldn't we 
simply remove all synchronization from here?

Also, used/unused vars are more for checking what's been used, and log items. 
Perhaps the intention is to ensure that the correct information is logged ? in 
that case, would a concurrent variant e.g. ConcurrentSkipListSet be better?

> Potential race condition in AbstractConfig
> --
>
> Key: KAFKA-9340
> URL: https://issues.apache.org/jira/browse/KAFKA-9340
> Project: Kafka
>  Issue Type: Bug
>Reporter: Roman Leventov
>Priority: Minor
>
> It's not clear why the {{used}} field in {{AbstractConfig}} should be a 
> synchronized set, but if does need to be synchronized, there is a race 
> condition in this line: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L214]
> {{keys.removeAll(used);}}
>  
> Possible fixes:
>  1. Document (e. g. in a comment) why {{used}} should be synchronized, and 
> replace line 214 with synchronized (used) \{ keys.removeAll(used); }
>  2. Remove unnecessary synchronization of {{used}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9355) RocksDB statistics are removed from JMX when EOS enabled and empty local state dir

2020-01-03 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006983#comment-17006983
 ] 

Boyang Chen edited comment on KAFKA-9355 at 1/3/20 5:34 PM:


Thanks for the report. Does all RocksDB metrics vanish or some of them? 


was (Author: bchen225242):
Thanks for the report. Does all RocksDB metrics vanish or some of them? 

cc [~cadonna]

> RocksDB statistics are removed from JMX when EOS enabled and empty local 
> state dir
> --
>
> Key: KAFKA-9355
> URL: https://issues.apache.org/jira/browse/KAFKA-9355
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.4.0
>Reporter: Stanislav Savulchik
>Priority: Major
> Attachments: metric-removal.log
>
>
> *Steps to Reproduce*
> Set processing.guarantee = exactly_once and remove local state dir in order 
> to force state restoration from changelog topics that have to be non empty.
> *Expected Behavior*
> There are registered MBeans like 
> kafka.streams:type=stream-state-metrics,client-id=-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,task-id=0_0,rocksdb-state-id=
>  for persistent RocksDB KeyValueStore-s after streams task state restoration.
> *Actual Behavior*
> There are no registered MBeans like above after streams task state 
> restoration.
> *Details*
> I managed to inject custom MetricsReporter in order to log metricChange and 
> metricRemoval calls. According to the logs at some point the missing metrics 
> are removed and never restored later. Here is an excerpt for 
> number-open-files metric:
> {noformat}
> 16:33:40.403 DEBUG c.m.r.LoggingMetricsReporter - metricChange MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 INFO  o.a.k.s.p.i.StoreChangelogReader - stream-thread 
> [morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1] No 
> checkpoint found for task 0_0 state store buffered-event changelog 
> morpheus.conversion-buffered-event-changelog-0 with EOS turned on. 
> Reinitializing the task and restore its state from the beginning.
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing metrics recorder for store 
> buffered-event of task 0_0 from metrics recording trigger
> 16:33
> 16:33:40.611 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> ...
> (no more calls to metricChange for the removed number-open-files 
> metric){noformat}
> Also a complete log is attached [^metric-removal.log]
> Metric removal happens along this call stack:
> {noformat}
> 19:27:35.509 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-9b76f302-7149-47de-b17b-362d642e05d5-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> java.lang.Exception: null
>at 
> casino.morpheus.reporter.LoggingMetricsReporter.metricRemoval(LoggingMetricsReporter.scala:24)
>at org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:534)
>at org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:448)
>at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.removeAllStoreLevelSensors(StreamsMetricsImpl.java:440)
>at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.close(MeteredKeyValueStore.java:345)
>

[jira] [Commented] (KAFKA-9335) java.lang.IllegalArgumentException: Number of partitions must be at least 1.

2020-01-03 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007641#comment-17007641
 ] 

Boyang Chen commented on KAFKA-9335:


Sounds good, I will try to reproduce it today.

> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> 
>
> Key: KAFKA-9335
> URL: https://issues.apache.org/jira/browse/KAFKA-9335
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Nitay Kufert
>Priority: Major
>  Labels: bug
>
> Hey,
> When trying to upgrade our Kafka streams client to 2.4.0 (from 2.3.1) we 
> encountered the following exception: 
> {code:java}
> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> {code}
> It's important to notice that the exact same code works just fine at 2.3.1.
>  
> I have created a "toy" example which reproduces this exception:
> [https://gist.github.com/nitayk/50da33b7bcce19ad0a7f8244d309cb8f]
> and I would love to get some insight regarding why its happening / ways to 
> get around it
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Lucas Bradstreet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007635#comment-17007635
 ] 

Lucas Bradstreet commented on KAFKA-9312:
-

[~pachilo] are you saying that in the existing code, the future returned will 
be completed even if the batch has only been split? I had noticed the same 
thing and wanted to look into that further, as it definitely seems like a bug 
to me.

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9363) Admin Script "kafka-topics" doesn't confirm on successful creation

2020-01-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007630#comment-17007630
 ] 

ASF GitHub Bot commented on KAFKA-9363:
---

mmanna-sapfgl commented on pull request #7893: KAFKA-9363 Confirms successful 
topic creation for JAdminClient
URL: https://github.com/apache/kafka/pull/7893
 
 
   When admin client script is used with --zookeeper, it's printing a 
confirmation message. The same doesn't occur when using JAdminClient. Since KIP 
500 is going to replace ZK in the future, it would be nice to have this 
confirmation message retained.
   
   No unit testing is requires as this message is simply a confirmation on the 
console upon a successful return from the AdminClient API call.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Admin Script "kafka-topics" doesn't confirm on successful creation
> --
>
> Key: KAFKA-9363
> URL: https://issues.apache.org/jira/browse/KAFKA-9363
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: M. Manna
>Assignee: M. Manna
>Priority: Minor
>
> When a topic is created from admin console, no confirmation is provided if 
> --bootstrap-server is chosen. 
>  
> How to reproduce:
> 1) Get 2.4.0 distro
> 2) Download and extract code
> 3) Run "kafka-topics --create --topic "dummy" --partition 1 
> --replication-factor 1 --bootstrap-server localhost:9092
> 4) Observe that no confirmation e.g. "Successfully created dummy" was 
> provided.
> We should, at least, provide a confirmation or restore the confirmation 
> message which was annunciated before using --zookeeper argument. We all must 
> use --describe flag to do a follow-up, but a confirmation message is a nice 
> addition.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9363) Admin Script "kafka-topics" doesn't confirm on successful creation

2020-01-03 Thread M. Manna (Jira)
M. Manna created KAFKA-9363:
---

 Summary: Admin Script "kafka-topics" doesn't confirm on successful 
creation
 Key: KAFKA-9363
 URL: https://issues.apache.org/jira/browse/KAFKA-9363
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.4.0
Reporter: M. Manna
Assignee: M. Manna


When a topic is created from admin console, no confirmation is provided if 
--bootstrap-server is chosen. 

 

How to reproduce:

1) Get 2.4.0 distro

2) Download and extract code

3) Run "kafka-topics --create --topic "dummy" --partition 1 
--replication-factor 1 --bootstrap-server localhost:9092

4) Observe that no confirmation e.g. "Successfully created dummy" was provided.


We should, at least, provide a confirmation or restore the confirmation message 
which was annunciated before using --zookeeper argument. We all must use 
--describe flag to do a follow-up, but a confirmation message is a nice 
addition.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9362) Missing GC logs when running Kafka with Java 11 with OpenJ9 VM

2020-01-03 Thread Alexandre Vermeerbergen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007525#comment-17007525
 ] 

Alexandre Vermeerbergen commented on KAFKA-9362:


My bad, the new version of the lines should be (&& instead of || to make sure 
we use JEP 158 option only for Java >=9 and non-OpenJ9 JVMs):

 

JAVA_VM_ISJ9=$($JAVA -version 2>&1|grep J9)
 if [[ "$JAVA_MAJOR_VERSION" -ge "9" && -n "$JAVA_VM_ISJ9" ]] ; then
 
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
 else
 KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
 fi

 

 

> Missing GC logs when running Kafka with Java 11 with OpenJ9 VM
> --
>
> Key: KAFKA-9362
> URL: https://issues.apache.org/jira/browse/KAFKA-9362
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1, 2.2.2, 2.4.0, 2.3.1
> Environment: AdoptOpenJDK11.0.5 (LTS release) on Linux, Windows, etc.
>Reporter: Alexandre Vermeerbergen
>Priority: Major
>  Labels: OpenJ9
> Fix For: 2.1.2, 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> When running Kafka server with a Java 11 JRE based on OpenJ9 JVM (instead of 
> usual HotSpot), we get this message in output:
> $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
> JVMJ9VM085W Malformed option: 
> '-Xlog:gc*:file=/home/data/ave/Java/kafka_2.12-2.4.0/bin/../logs/zookeeper-gc.log:time'
> [2020-01-03 12:46:35,575] INFO Reading configuration from: 
> config/zookeeper.properties 
> (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
> [2020-01-03 12:46:35,576] WARN config/zookeeper.properties is relative. 
> Prepend ./ to indicate that you're sure! 
> (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
> [2020-01-03 12:46:35,577] INFO clientPortAddress is 0.0.0.0/0.0.0.0:2181 
> (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
> ...
> and likewise when running '{{bin}}{{/kafka-server-start}}{{.sh 
> config}}{{/server}}{{.properties}}'
> While this does not prevent Kafka server (and its Zookeeper server) from 
> running, it has the effect that GC logs are not written, making diagnostics 
> harder than when running Kafka with a Java 11 based on HotSpot JVM.
> This is because OpenJ9 does not implements JEP 158, and thus even Java 11 
> versions based on OpenJ9 still rely on the same GC logging options as Java 8.
> I believe that it is easy to fix this issue by making a change to 
> [https://github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh,] lines 
> 295-299:
>  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
>  
> KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
>  else
>  KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc 
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
>  fi
> which could be replaced by:
>  JAVA_VM_ISJ9=$($JAVA -version 2>&1|grep J9)
>  if [[ "$JAVA_MAJOR_VERSION" -ge "9" || -n "$JAVA_VM_ISJ9" ]] ; then
>  
> KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
>  else
>  KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc 
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
>  fi
> Please let me know if that sounds good to you.
> Please note that IBM AIX Java is also based on OpenJ9, so fixing this issue 
> would also nice for AIX users !
> Kind regards,
> Alexandre
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9362) Missing GC logs when running Kafka with Java 11 with OpenJ9 VM

2020-01-03 Thread Alexandre Vermeerbergen (Jira)
Alexandre Vermeerbergen created KAFKA-9362:
--

 Summary: Missing GC logs when running Kafka with Java 11 with 
OpenJ9 VM
 Key: KAFKA-9362
 URL: https://issues.apache.org/jira/browse/KAFKA-9362
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.1, 2.4.0, 2.2.2, 2.1.1
 Environment: AdoptOpenJDK11.0.5 (LTS release) on Linux, Windows, etc.

Reporter: Alexandre Vermeerbergen
 Fix For: 2.1.2, 2.2.3, 2.5.0, 2.3.2, 2.4.1


When running Kafka server with a Java 11 JRE based on OpenJ9 JVM (instead of 
usual HotSpot), we get this message in output:

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
JVMJ9VM085W Malformed option: 
'-Xlog:gc*:file=/home/data/ave/Java/kafka_2.12-2.4.0/bin/../logs/zookeeper-gc.log:time'
[2020-01-03 12:46:35,575] INFO Reading configuration from: 
config/zookeeper.properties 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2020-01-03 12:46:35,576] WARN config/zookeeper.properties is relative. Prepend 
./ to indicate that you're sure! 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2020-01-03 12:46:35,577] INFO clientPortAddress is 0.0.0.0/0.0.0.0:2181 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...



and likewise when running '{{bin}}{{/kafka-server-start}}{{.sh 
config}}{{/server}}{{.properties}}'

While this does not prevent Kafka server (and its Zookeeper server) from 
running, it has the effect that GC logs are not written, making diagnostics 
harder than when running Kafka with a Java 11 based on HotSpot JVM.

This is because OpenJ9 does not implements JEP 158, and thus even Java 11 
versions based on OpenJ9 still rely on the same GC logging options as Java 8.

I believe that it is easy to fix this issue by making a change to 
[https://github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh,] lines 
295-299:

 if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
 
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
 else
 KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
 fi

which could be replaced by:

 JAVA_VM_ISJ9=$($JAVA -version 2>&1|grep J9)
 if [[ "$JAVA_MAJOR_VERSION" -ge "9" || -n "$JAVA_VM_ISJ9" ]] ; then
 
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
 else
 KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
 fi

Please let me know if that sounds good to you.

Please note that IBM AIX Java is also based on OpenJ9, so fixing this issue 
would also nice for AIX users !


Kind regards,
Alexandre

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9335) java.lang.IllegalArgumentException: Number of partitions must be at least 1.

2020-01-03 Thread Nitay Kufert (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007394#comment-17007394
 ] 

Nitay Kufert commented on KAFKA-9335:
-

1. Yes
2. I tried with 12 but I believe it wouldn't matter
3. Pretty sure you don't needs any data
4. Tested it with 2.3.1 and it works, didn't try running it with 2.2 or 2.3 

> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> 
>
> Key: KAFKA-9335
> URL: https://issues.apache.org/jira/browse/KAFKA-9335
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Nitay Kufert
>Priority: Major
>  Labels: bug
>
> Hey,
> When trying to upgrade our Kafka streams client to 2.4.0 (from 2.3.1) we 
> encountered the following exception: 
> {code:java}
> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> {code}
> It's important to notice that the exact same code works just fine at 2.3.1.
>  
> I have created a "toy" example which reproduces this exception:
> [https://gist.github.com/nitayk/50da33b7bcce19ad0a7f8244d309cb8f]
> and I would love to get some insight regarding why its happening / ways to 
> get around it
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:19 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send() -> ProduceRequestResult#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:18 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to 
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to KafkaProducer#send()#get() will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:18 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to 
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:18 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to KafkaProducer#send()#get() will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to`KafkaProducer#send()#get()` will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|[https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007335#comment-17007335
 ] 

Jonathan Santilli commented on KAFKA-9312:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to`KafkaProducer#send()#get()` will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|[https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)