[jira] [Commented] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test
[ https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431677#comment-16431677 ] Ted Yu commented on KAFKA-6735: --- findbugs may take some time. Skipping findbugs is not for CI. It is for running tests locally. > Document how to skip findbugs / checkstyle when running unit test > - > > Key: KAFKA-6735 > URL: https://issues.apache.org/jira/browse/KAFKA-6735 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Ewen Cheslack-Postava >Priority: Minor > > Even when running single unit test, findbugs dependency would result in some > time spent before the test is actually run. > We should document how findbugs dependency can be skipped in such scenario: > {code} > -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread
[ https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251 ] Ted Yu edited comment on KAFKA-6303 at 4/10/18 4:05 AM: +1 was (Author: yuzhih...@gmail.com): lgtm > Potential lack of synchronization in NioEchoServer#AcceptorThread > - > > Key: KAFKA-6303 > URL: https://issues.apache.org/jira/browse/KAFKA-6303 > Project: Kafka > Issue Type: Bug > Components: network >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In the run() method: > {code} > SocketChannel socketChannel = > ((ServerSocketChannel) key.channel()).accept(); > socketChannel.configureBlocking(false); > newChannels.add(socketChannel); > {code} > Modification to newChannels should be protected by synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test
[ https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6735: Assignee: Ewen Cheslack-Postava > Document how to skip findbugs / checkstyle when running unit test > - > > Key: KAFKA-6735 > URL: https://issues.apache.org/jira/browse/KAFKA-6735 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Ewen Cheslack-Postava >Priority: Minor > > Even when running single unit test, findbugs dependency would result in some > time spent before the test is actually run. > We should document how findbugs dependency can be skipped in such scenario: > {code} > -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test
[ https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431672#comment-16431672 ] Ewen Cheslack-Postava commented on KAFKA-6735: -- Why should we document outs for these? They should never result in merging PRs more quickly, as seems to be the goal here, since any failure mentioned here should result in CI failures that a committer would never merge since the CI builds failed. > Document how to skip findbugs / checkstyle when running unit test > - > > Key: KAFKA-6735 > URL: https://issues.apache.org/jira/browse/KAFKA-6735 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > Even when running single unit test, findbugs dependency would result in some > time spent before the test is actually run. > We should document how findbugs dependency can be skipped in such scenario: > {code} > -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6769) Upgrade jetty library version
[ https://issues.apache.org/jira/browse/KAFKA-6769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431588#comment-16431588 ] Ismael Juma commented on KAFKA-6769: Newer Jetty versions require Java 8 and that's why we have not upgraded yet. > Upgrade jetty library version > - > > Key: KAFKA-6769 > URL: https://issues.apache.org/jira/browse/KAFKA-6769 > Project: Kafka > Issue Type: Task > Components: core, security >Affects Versions: 1.1.0 >Reporter: Di Shang >Priority: Critical > > jetty 9.2 has reached end of life as of Jan 2018 > [http://www.eclipse.org/jetty/documentation/current/what-jetty-version.html#d0e203] > Current version used in Kafka 1.1.0: 9.2.24.v20180105 > For security reason please upgrade to a later version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6769) Upgrade jetty library version
Di Shang created KAFKA-6769: --- Summary: Upgrade jetty library version Key: KAFKA-6769 URL: https://issues.apache.org/jira/browse/KAFKA-6769 Project: Kafka Issue Type: Task Components: core, security Affects Versions: 1.1.0 Reporter: Di Shang jetty 9.2 has reached end of life as of Jan 2018 [http://www.eclipse.org/jetty/documentation/current/what-jetty-version.html#d0e203] Current version used in Kafka 1.1.0: 9.2.24.v20180105 For security reason please upgrade to a later version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6768) Producer may hang in close with pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6768. Resolution: Fixed > Producer may hang in close with pending transaction > --- > > Key: KAFKA-6768 > URL: https://issues.apache.org/jira/browse/KAFKA-6768 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.1 > > > There is an edge case for transactional producers which will cause close() to > hang indefinitely (unless used with a timeout). Say, for example, that the > producer is trying to send an AddPartitionsToTxn request to the broker. Upon > shutdown, the Sender's running flag will be set to false and we will begin > graceful shutdown. Graceful shutdown will not complete, however, until we can > send the AddPartitionsToTxn request. But the latter is blocked by the fact > that the running flag is disabled. So no progress can be made and shutdown > cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6768) Producer may hang in close with pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431441#comment-16431441 ] ASF GitHub Bot commented on KAFKA-6768: --- hachikuji closed pull request #4842: KAFKA-6768; Transactional producer may hang in close with pending requests URL: https://github.com/apache/kafka/pull/4842 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 426b273b885..0514c995635 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) { return false; AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); -while (running) { +while (!forceClose) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 6fcf4805967..558ec721096 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -132,6 +132,26 @@ public void setup() { client.setNode(brokerNode); } +@Test +public void testSenderShutdownWithPendingAddPartitions() throws Exception { +long pid = 13131L; +short epoch = 1; +doInitTransactions(pid, epoch); +transactionManager.beginTransaction(); + +transactionManager.maybeAddPartitionToTransaction(tp0); +FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), +"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + +prepareAddPartitionsToTxn(tp0, Errors.NONE); +prepareProduceResponse(Errors.NONE, pid, epoch); + +sender.initiateClose(); +sender.run(); + +assertTrue(sendFuture.isDone()); +} + @Test public void testEndTxnNotSentIfIncompleteBatches() { long pid = 13131L; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Producer may hang in close with pending transaction > --- > > Key: KAFKA-6768 > URL: https://issues.apache.org/jira/browse/KAFKA-6768 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.1 > > > There is an edge case for transactional producers which will cause close() to > hang indefinitely (unless used with a timeout). Say, for example, that the > producer is trying to send an AddPartitionsToTxn request to the broker. Upon > shutdown, the Sender's running flag will be set to false and we will begin > graceful shutdown. Graceful shutdown will not complete, however, until we can > send the AddPartitionsToTxn request. But the latter is blocked by the fact > that the running flag is disabled. So no progress can be made and shutdown > cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6768) Producer may hang in close with pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431183#comment-16431183 ] ASF GitHub Bot commented on KAFKA-6768: --- hachikuji opened a new pull request #4842: KAFKA-6768; Transactional producer may hang in close with pending requests URL: https://github.com/apache/kafka/pull/4842 This patch fixes an edge case in producer shutdown which prevents `close()` from completing due to a pending request which will never be sent due to shutdown initiation. I have added a test case which reproduces the scenario. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Producer may hang in close with pending transaction > --- > > Key: KAFKA-6768 > URL: https://issues.apache.org/jira/browse/KAFKA-6768 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.1 > > > There is an edge case for transactional producers which will cause close() to > hang indefinitely (unless used with a timeout). Say, for example, that the > producer is trying to send an AddPartitionsToTxn request to the broker. Upon > shutdown, the Sender's running flag will be set to false and we will begin > graceful shutdown. Graceful shutdown will not complete, however, until we can > send the AddPartitionsToTxn request. But the latter is blocked by the fact > that the running flag is disabled. So no progress can be made and shutdown > cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6768) Producer may hang in close with pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431168#comment-16431168 ] Jason Gustafson commented on KAFKA-6768: Initially I thought this affected 1.1.0, but the regression was only in trunk. > Producer may hang in close with pending transaction > --- > > Key: KAFKA-6768 > URL: https://issues.apache.org/jira/browse/KAFKA-6768 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.1 > > > There is an edge case for transactional producers which will cause close() to > hang indefinitely (unless used with a timeout). Say, for example, that the > producer is trying to send an AddPartitionsToTxn request to the broker. Upon > shutdown, the Sender's running flag will be set to false and we will begin > graceful shutdown. Graceful shutdown will not complete, however, until we can > send the AddPartitionsToTxn request. But the latter is blocked by the fact > that the running flag is disabled. So no progress can be made and shutdown > cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6768) Producer may hang in close with pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-6768: --- Affects Version/s: (was: 1.1.0) > Producer may hang in close with pending transaction > --- > > Key: KAFKA-6768 > URL: https://issues.apache.org/jira/browse/KAFKA-6768 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.1 > > > There is an edge case for transactional producers which will cause close() to > hang indefinitely (unless used with a timeout). Say, for example, that the > producer is trying to send an AddPartitionsToTxn request to the broker. Upon > shutdown, the Sender's running flag will be set to false and we will begin > graceful shutdown. Graceful shutdown will not complete, however, until we can > send the AddPartitionsToTxn request. But the latter is blocked by the fact > that the running flag is disabled. So no progress can be made and shutdown > cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6768) Producer may hang in close with pending transaction
Jason Gustafson created KAFKA-6768: -- Summary: Producer may hang in close with pending transaction Key: KAFKA-6768 URL: https://issues.apache.org/jira/browse/KAFKA-6768 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 1.1.0 Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 1.1.1 There is an edge case for transactional producers which will cause close() to hang indefinitely (unless used with a timeout). Say, for example, that the producer is trying to send an AddPartitionsToTxn request to the broker. Upon shutdown, the Sender's running flag will be set to false and we will begin graceful shutdown. Graceful shutdown will not complete, however, until we can send the AddPartitionsToTxn request. But the latter is blocked by the fact that the running flag is disabled. So no progress can be made and shutdown cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6709) broker failed to handle request due to OOM
[ https://issues.apache.org/jira/browse/KAFKA-6709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah reassigned KAFKA-6709: --- Assignee: Dhruvil Shah > broker failed to handle request due to OOM > -- > > Key: KAFKA-6709 > URL: https://issues.apache.org/jira/browse/KAFKA-6709 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.1 >Reporter: Zou Tao >Assignee: Dhruvil Shah >Priority: Critical > Attachments: kafkaServer-gc.log.0.current.zip, kafkaServer.out.tgz, > normal-kafkaServer-gc.log.0.current.zip, server.properties > > > I have updated to release 1.0.1. > I set up cluster which have four brokers. > you could find the server.properties in the attachment. > There are about 150 topics, and about total 4000 partitions, > ReplicationFactor is 2. > connctors are used to write/read data to/from brokers. > connecotr version is 0.10.1. > The average message size is 500B, and around 6 messages per seconds. > one of the broker keep report OOM, and can't handle request like: > [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request > {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[ > {partition=16,fetch_offset=51198,max_bytes=60728640} > ,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} > (kafka.server.KafkaApis) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523) > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513) > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041) > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54) > at > kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593) > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609) > at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601) > at kafka.server.KafkaApis.handle(KafkaApis.scala:99) > and then lots of shrink ISR ( this broker is 1001). > [2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 > broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition) >
[jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists
Steven Schlansker created KAFKA-6767: Summary: OffsetCheckpoint write assumes parent directory exists Key: KAFKA-6767 URL: https://issues.apache.org/jira/browse/KAFKA-6767 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Steven Schlansker We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an instance dies it is created from scratch, rather than reusing the existing RocksDB.) We routinely see: {code:java} 2018-04-09T19:14:35.004Z WARN <> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} java.io.FileNotFoundException: /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} Inspecting the state store directory, I can indeed see that {{chat/0_11}} does not exist (although many other partitions do). Looking at the OffsetCheckpoint write method, it seems to try to open a new checkpoint file without first ensuring that the parent directory exists. {code:java} public void write(final Mapoffsets) throws IOException { // if there is no offsets, skip writing the file to save disk IOs if (offsets.isEmpty()) { return; } synchronized (lock) { // write to temp file and then swap with the existing file final File temp = new File(file.getAbsolutePath() + ".tmp");{code} Either the OffsetCheckpoint class should initialize the directories if needed, or some precondition of it being called should ensure that is the case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4650) Improve test coverage org.apache.kafka.streams.kstream.internals
[ https://issues.apache.org/jira/browse/KAFKA-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430988#comment-16430988 ] Matthias J. Sax commented on KAFKA-4650: I agree that smaller PRs are easier to handle. Not sure if we need multiple tickets -- feel free to do a "partial PR" that tackles some of the mentioned classes. > Improve test coverage org.apache.kafka.streams.kstream.internals > > > Key: KAFKA-4650 > URL: https://issues.apache.org/jira/browse/KAFKA-4650 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Priority: Minor > Labels: newbie > > Lots of classes have little or no coverage at all, i.e., > {{KTableAggregate.KTableAggregateValueGetter}} > {{KTableKTableRightJoin.KTableKTableRightJoinValueGetterSupplier}} > {{KStreamAggregate.KStreamAggregateValueGetter}} > {{KStreamReduce.KStreamReduceValueGetter}} > {{KStreamWindowReduce.new KTableValueGetterSupplier}} > {{KTableAggregate.new KTableValueGetterSupplier}} > {{KTableRepartitionMap.new KTableValueGetterSupplier}} > {{KTableKTableRightJoin.KTableKTableRightJoinValueGetter}} > {{KTableKTableLeftJoinValueGetter}} > {{KStreamWindowReduce.KStreamWindowReduceValueGetter}} > {{TimeWindow}} > {{ChangedSerializer}} > {{UnlimitedWindow}} > {{WindowedDeserializer}} > {{KStreamSessionWindowAggregate.KTableSessionWindowValueGetter}} > {{KTableRepartitionMap}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
[ https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6749: Assignee: Jagadesh Adireddi > TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE > -- > > Key: KAFKA-6749 > URL: https://issues.apache.org/jira/browse/KAFKA-6749 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Frederic Arno >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > Stream processing topologies which are configured to use {{EXACTLY_ONCE}} > processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests > usually crash with {{java.lang.IllegalStateException: MockProducer hasn't > been initialized for transactions}} within the second call to > {{TopologyTestDriver.pipeInput()}}, the first call works fine. > Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass. > This is a problem because it is expected that proper processor topologies can > be successfully tested using {{TopologyTestDriver}}, however > {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during > tests. To a developer, this usually means that there is something wrong with > their processor topologies. > Kafka developpers can reproduce this by adding: > {code:java} > put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE);{code} > to line 88 of TopologyTestDriverTest: > streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java > Originally [reported on the > ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6766) Kafka offset moved backward
[ https://issues.apache.org/jira/browse/KAFKA-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430634#comment-16430634 ] Janmejay Baral commented on KAFKA-6766: --- I am using Kafka version 0.11.0.0 > Kafka offset moved backward > --- > > Key: KAFKA-6766 > URL: https://issues.apache.org/jira/browse/KAFKA-6766 > Project: Kafka > Issue Type: Bug > Components: offset manager > Environment: 3 Noded Kafka Cluster >Reporter: Janmejay Baral >Priority: Major > > * While on putting load on Kafka topic , encountered with a problem where it > says "[2018-04-09 19:45:36,771] INFO [Group Metadata Manager on Broker 1]: > Group ingestionconsumers transitioned to Dead in generation 0 > (kafka.coordinator.group.GroupMetadataManager)" in the server.log. > * Once I found the offsets have gone backward , I found this from the server > log. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6766) Kafka offset moved backward
Janmejay Baral created KAFKA-6766: - Summary: Kafka offset moved backward Key: KAFKA-6766 URL: https://issues.apache.org/jira/browse/KAFKA-6766 Project: Kafka Issue Type: Bug Components: offset manager Environment: 3 Noded Kafka Cluster Reporter: Janmejay Baral * While on putting load on Kafka topic , encountered with a problem where it says "[2018-04-09 19:45:36,771] INFO [Group Metadata Manager on Broker 1]: Group ingestionconsumers transitioned to Dead in generation 0 (kafka.coordinator.group.GroupMetadataManager)" in the server.log. * Once I found the offsets have gone backward , I found this from the server log. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430366#comment-16430366 ] Veera commented on KAFKA-6052: -- Hi Jason Could you please confirm the release date of either 1.1.1. or 1.2.0 ? Many Thanks. > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0, 1.0.1 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Assignee: Vahid Hashemian >Priority: Major > Labels: transactions, windows > Fix For: 1.2.0, 1.1.1 > > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producerproducer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430348#comment-16430348 ] Ben commented on KAFKA-1194: Note we also see this issue on Windows, but in slightly different circumstances. If we try to delete a topic then the broker crashes entirely with an error about not being able to delete files while they are in use. You then can't restart the broker until you delete all the log files manually first. > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch, windows > Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, > kafka-1194-v2.patch, screenshot-1.png > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.log.Log.deleteOldSegments(Log.scala:418) > at > kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) > at > kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > I think this error happens because kafka tries to rename the log file when it > is still opened. So we should close the file first before rename. > The index file uses a special data structure, the MappedByteBuffer. Javadoc > describes it as: > A mapped byte buffer and the file mapping that it represents remain valid > until the buffer itself is garbage-collected. > Fortunately, I find a forceUnmap function in kafka code, and perhaps it can > be used to free the MappedByteBuffer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6765) Intermittent test failure in CustomQuotaCallbackTest
Rajini Sivaram created KAFKA-6765: - Summary: Intermittent test failure in CustomQuotaCallbackTest Key: KAFKA-6765 URL: https://issues.apache.org/jira/browse/KAFKA-6765 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.2.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.2.0 Exception stack trace: {quote} java.lang.NullPointerException at org.apache.kafka.common.metrics.stats.SampledStat.purgeObsoleteSamples(SampledStat.java:104) at org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:74) at org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:68) at kafka.api.QuotaTestClients$.metricValue(BaseQuotaTest.scala:163) at kafka.api.QuotaTestClients.produceUntilThrottled(BaseQuotaTest.scala:193) at kafka.api.CustomQuotaCallbackTest$GroupedUser.produceConsume(CustomQuotaCallbackTest.scala:272) at kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:146) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4650) Improve test coverage org.apache.kafka.streams.kstream.internals
[ https://issues.apache.org/jira/browse/KAFKA-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430327#comment-16430327 ] Jimin Hsieh commented on KAFKA-4650: Is it possible to split this issue into multiple smaller issues? It would be much easier for contributors to work on and committers to review. > Improve test coverage org.apache.kafka.streams.kstream.internals > > > Key: KAFKA-4650 > URL: https://issues.apache.org/jira/browse/KAFKA-4650 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Priority: Minor > Labels: newbie > > Lots of classes have little or no coverage at all, i.e., > {{KTableAggregate.KTableAggregateValueGetter}} > {{KTableKTableRightJoin.KTableKTableRightJoinValueGetterSupplier}} > {{KStreamAggregate.KStreamAggregateValueGetter}} > {{KStreamReduce.KStreamReduceValueGetter}} > {{KStreamWindowReduce.new KTableValueGetterSupplier}} > {{KTableAggregate.new KTableValueGetterSupplier}} > {{KTableRepartitionMap.new KTableValueGetterSupplier}} > {{KTableKTableRightJoin.KTableKTableRightJoinValueGetter}} > {{KTableKTableLeftJoinValueGetter}} > {{KStreamWindowReduce.KStreamWindowReduceValueGetter}} > {{TimeWindow}} > {{ChangedSerializer}} > {{UnlimitedWindow}} > {{WindowedDeserializer}} > {{KStreamSessionWindowAggregate.KTableSessionWindowValueGetter}} > {{KTableRepartitionMap}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430231#comment-16430231 ] Valentino Proietti commented on KAFKA-6742: --- Thank you [~mjsax] ! > TopologyTestDriver error when dealing with stores from GlobalKTable > --- > > Key: KAFKA-6742 > URL: https://issues.apache.org/jira/browse/KAFKA-6742 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Valentino Proietti >Assignee: Valentino Proietti >Priority: Minor > > {color:#ff}This junit test simply fails:{color} > @Test > *public* *void* globalTable() { > StreamsBuilder builder = *new* StreamsBuilder(); > @SuppressWarnings("unused") > *final* KTablelocalTable = builder > .table("local", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("localStore")) > ; > @SuppressWarnings("unused") > *final* GlobalKTable globalTable = builder > .globalTable("global", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("globalStore")) > ; > // > Properties props = *new* Properties(); > props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); > props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); > TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), > props); > // > *final* KeyValueStore localStore = > testDriver.getKeyValueStore("localStore"); > Assert._assertNotNull_(localStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); > // > *final* KeyValueStore globalStore = > testDriver.getKeyValueStore("globalStore"); > Assert._assertNotNull_(globalStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); > // > *final* ConsumerRecordFactory crf = *new* > ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); > testDriver.pipeInput(crf.create("local", "one", "TheOne")); > testDriver.pipeInput(crf.create("global", "one", "TheOne")); > // > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > > > {color:#ff}to make it work I had to modify the TopologyTestDriver class > as follow:{color} > ... > *public* Map getAllStateStores() { > // final Map allStores = new HashMap<>(); > // for (final String storeName : > internalTopologyBuilder.allStateStoreName()) > { // allStores.put(storeName, ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(storeName)); // } > // return allStores; > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > *final* Map allStores = *new* HashMap<>(); > *for* (*final* String storeName : > internalTopologyBuilder.allStateStoreName()) { > StateStore res = psm.getStore(storeName); > if (res == null) > res = psm.getGlobalStore(storeName); > allStores.put(storeName, res); > } > *return* allStores; > } > ... > *public* StateStore getStateStore(*final* String name) { > // return ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(name); > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > StateStore res = psm.getStore(name); > *if* (res == *null*) > res = psm.getGlobalStore(name); > *return* res; > } > > {color:#ff}moreover I think it would be very useful to make the internal > MockProducer public for testing cases where a producer is used along side > with the "normal" stream processing by adding the method:{color} > /** > * *@return* records sent with this producer are automatically streamed > to the topology. > */ > *public* *final* Producer<*byte*[], *byte*[]> getProducer() { > return producer; > } > > {color:#ff}unfortunately this introduces another problem that could be > verified by adding the following lines to the previous junit test:{color} > ... > ** > // > ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); > // just to serialize keys and values > testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.advanceWallClockTime(0); > Assert._assertEquals_("TheOne", localStore.get("one")); >