[jira] [Commented] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test

2018-04-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431677#comment-16431677
 ] 

Ted Yu commented on KAFKA-6735:
---

findbugs may take some time. Skipping findbugs is not for CI. It is for running 
tests locally.

> Document how to skip findbugs / checkstyle when running unit test
> -
>
> Key: KAFKA-6735
> URL: https://issues.apache.org/jira/browse/KAFKA-6735
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> Even when running single unit test, findbugs dependency would result in some 
> time spent before the test is actually run.
> We should document how findbugs dependency can be skipped in such scenario:
> {code}
> -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2018-04-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251
 ] 

Ted Yu edited comment on KAFKA-6303 at 4/10/18 4:05 AM:


+1


was (Author: yuzhih...@gmail.com):
lgtm

> Potential lack of synchronization in NioEchoServer#AcceptorThread
> -
>
> Key: KAFKA-6303
> URL: https://issues.apache.org/jira/browse/KAFKA-6303
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In the run() method:
> {code}
> SocketChannel socketChannel = 
> ((ServerSocketChannel) key.channel()).accept();
> socketChannel.configureBlocking(false);
> newChannels.add(socketChannel);
> {code}
> Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test

2018-04-09 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava reassigned KAFKA-6735:


Assignee: Ewen Cheslack-Postava

> Document how to skip findbugs / checkstyle when running unit test
> -
>
> Key: KAFKA-6735
> URL: https://issues.apache.org/jira/browse/KAFKA-6735
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> Even when running single unit test, findbugs dependency would result in some 
> time spent before the test is actually run.
> We should document how findbugs dependency can be skipped in such scenario:
> {code}
> -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6735) Document how to skip findbugs / checkstyle when running unit test

2018-04-09 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431672#comment-16431672
 ] 

Ewen Cheslack-Postava commented on KAFKA-6735:
--

Why should we document outs for these? They should never result in merging PRs 
more quickly, as seems to be the goal here, since any failure mentioned here 
should result in CI failures that a committer would never merge since the CI 
builds failed.

 

> Document how to skip findbugs / checkstyle when running unit test
> -
>
> Key: KAFKA-6735
> URL: https://issues.apache.org/jira/browse/KAFKA-6735
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> Even when running single unit test, findbugs dependency would result in some 
> time spent before the test is actually run.
> We should document how findbugs dependency can be skipped in such scenario:
> {code}
> -x findbugsMain -x findbugsTest -x checkStyleMain -x checkStyleTest
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6769) Upgrade jetty library version

2018-04-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431588#comment-16431588
 ] 

Ismael Juma commented on KAFKA-6769:


Newer Jetty versions require Java 8 and that's why we have not upgraded yet.

> Upgrade jetty library version
> -
>
> Key: KAFKA-6769
> URL: https://issues.apache.org/jira/browse/KAFKA-6769
> Project: Kafka
>  Issue Type: Task
>  Components: core, security
>Affects Versions: 1.1.0
>Reporter: Di Shang
>Priority: Critical
>
> jetty 9.2 has reached end of life as of Jan 2018
> [http://www.eclipse.org/jetty/documentation/current/what-jetty-version.html#d0e203]
> Current version used in Kafka 1.1.0: 9.2.24.v20180105
> For security reason please upgrade to a later version. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6769) Upgrade jetty library version

2018-04-09 Thread Di Shang (JIRA)
Di Shang created KAFKA-6769:
---

 Summary: Upgrade jetty library version
 Key: KAFKA-6769
 URL: https://issues.apache.org/jira/browse/KAFKA-6769
 Project: Kafka
  Issue Type: Task
  Components: core, security
Affects Versions: 1.1.0
Reporter: Di Shang


jetty 9.2 has reached end of life as of Jan 2018

[http://www.eclipse.org/jetty/documentation/current/what-jetty-version.html#d0e203]

Current version used in Kafka 1.1.0: 9.2.24.v20180105

For security reason please upgrade to a later version. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6768) Producer may hang in close with pending transaction

2018-04-09 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6768.

Resolution: Fixed

> Producer may hang in close with pending transaction
> ---
>
> Key: KAFKA-6768
> URL: https://issues.apache.org/jira/browse/KAFKA-6768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.1
>
>
> There is an edge case for transactional producers which will cause close() to 
> hang indefinitely (unless used with a timeout). Say, for example, that the 
> producer is trying to send an AddPartitionsToTxn request to the broker. Upon 
> shutdown, the Sender's running flag will be set to false and we will begin 
> graceful shutdown. Graceful shutdown will not complete, however, until we can 
> send the AddPartitionsToTxn request. But the latter is blocked by the fact 
> that the running flag is disabled. So no progress can be made and shutdown 
> cannot complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6768) Producer may hang in close with pending transaction

2018-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431441#comment-16431441
 ] 

ASF GitHub Bot commented on KAFKA-6768:
---

hachikuji closed pull request #4842: KAFKA-6768; Transactional producer may 
hang in close with pending requests
URL: https://github.com/apache/kafka/pull/4842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 426b273b885..0514c995635 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) {
 return false;
 
 AbstractRequest.Builder requestBuilder = 
nextRequestHandler.requestBuilder();
-while (running) {
+while (!forceClose) {
 Node targetNode = null;
 try {
 if (nextRequestHandler.needsCoordinator()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6fcf4805967..558ec721096 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -132,6 +132,26 @@ public void setup() {
 client.setNode(brokerNode);
 }
 
+@Test
+public void testSenderShutdownWithPendingAddPartitions() throws Exception {
+long pid = 13131L;
+short epoch = 1;
+doInitTransactions(pid, epoch);
+transactionManager.beginTransaction();
+
+transactionManager.maybeAddPartitionToTransaction(tp0);
+FutureRecordMetadata sendFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+"value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+prepareAddPartitionsToTxn(tp0, Errors.NONE);
+prepareProduceResponse(Errors.NONE, pid, epoch);
+
+sender.initiateClose();
+sender.run();
+
+assertTrue(sendFuture.isDone());
+}
+
 @Test
 public void testEndTxnNotSentIfIncompleteBatches() {
 long pid = 13131L;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Producer may hang in close with pending transaction
> ---
>
> Key: KAFKA-6768
> URL: https://issues.apache.org/jira/browse/KAFKA-6768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.1
>
>
> There is an edge case for transactional producers which will cause close() to 
> hang indefinitely (unless used with a timeout). Say, for example, that the 
> producer is trying to send an AddPartitionsToTxn request to the broker. Upon 
> shutdown, the Sender's running flag will be set to false and we will begin 
> graceful shutdown. Graceful shutdown will not complete, however, until we can 
> send the AddPartitionsToTxn request. But the latter is blocked by the fact 
> that the running flag is disabled. So no progress can be made and shutdown 
> cannot complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6768) Producer may hang in close with pending transaction

2018-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431183#comment-16431183
 ] 

ASF GitHub Bot commented on KAFKA-6768:
---

hachikuji opened a new pull request #4842: KAFKA-6768; Transactional producer 
may hang in close with pending requests
URL: https://github.com/apache/kafka/pull/4842
 
 
   This patch fixes an edge case in producer shutdown which prevents `close()` 
from completing due to a pending request which will never be sent due to 
shutdown initiation. I have added a test case which reproduces the scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Producer may hang in close with pending transaction
> ---
>
> Key: KAFKA-6768
> URL: https://issues.apache.org/jira/browse/KAFKA-6768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.1
>
>
> There is an edge case for transactional producers which will cause close() to 
> hang indefinitely (unless used with a timeout). Say, for example, that the 
> producer is trying to send an AddPartitionsToTxn request to the broker. Upon 
> shutdown, the Sender's running flag will be set to false and we will begin 
> graceful shutdown. Graceful shutdown will not complete, however, until we can 
> send the AddPartitionsToTxn request. But the latter is blocked by the fact 
> that the running flag is disabled. So no progress can be made and shutdown 
> cannot complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6768) Producer may hang in close with pending transaction

2018-04-09 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431168#comment-16431168
 ] 

Jason Gustafson commented on KAFKA-6768:


Initially I thought this affected 1.1.0, but the regression was only in trunk. 

> Producer may hang in close with pending transaction
> ---
>
> Key: KAFKA-6768
> URL: https://issues.apache.org/jira/browse/KAFKA-6768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.1
>
>
> There is an edge case for transactional producers which will cause close() to 
> hang indefinitely (unless used with a timeout). Say, for example, that the 
> producer is trying to send an AddPartitionsToTxn request to the broker. Upon 
> shutdown, the Sender's running flag will be set to false and we will begin 
> graceful shutdown. Graceful shutdown will not complete, however, until we can 
> send the AddPartitionsToTxn request. But the latter is blocked by the fact 
> that the running flag is disabled. So no progress can be made and shutdown 
> cannot complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6768) Producer may hang in close with pending transaction

2018-04-09 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6768:
---
Affects Version/s: (was: 1.1.0)

> Producer may hang in close with pending transaction
> ---
>
> Key: KAFKA-6768
> URL: https://issues.apache.org/jira/browse/KAFKA-6768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.1
>
>
> There is an edge case for transactional producers which will cause close() to 
> hang indefinitely (unless used with a timeout). Say, for example, that the 
> producer is trying to send an AddPartitionsToTxn request to the broker. Upon 
> shutdown, the Sender's running flag will be set to false and we will begin 
> graceful shutdown. Graceful shutdown will not complete, however, until we can 
> send the AddPartitionsToTxn request. But the latter is blocked by the fact 
> that the running flag is disabled. So no progress can be made and shutdown 
> cannot complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6768) Producer may hang in close with pending transaction

2018-04-09 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6768:
--

 Summary: Producer may hang in close with pending transaction
 Key: KAFKA-6768
 URL: https://issues.apache.org/jira/browse/KAFKA-6768
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.1.0
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 1.1.1


There is an edge case for transactional producers which will cause close() to 
hang indefinitely (unless used with a timeout). Say, for example, that the 
producer is trying to send an AddPartitionsToTxn request to the broker. Upon 
shutdown, the Sender's running flag will be set to false and we will begin 
graceful shutdown. Graceful shutdown will not complete, however, until we can 
send the AddPartitionsToTxn request. But the latter is blocked by the fact that 
the running flag is disabled. So no progress can be made and shutdown cannot 
complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6709) broker failed to handle request due to OOM

2018-04-09 Thread Dhruvil Shah (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-6709:
---

Assignee: Dhruvil Shah

> broker failed to handle request due to OOM
> --
>
> Key: KAFKA-6709
> URL: https://issues.apache.org/jira/browse/KAFKA-6709
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.1
>Reporter: Zou Tao
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: kafkaServer-gc.log.0.current.zip, kafkaServer.out.tgz, 
> normal-kafkaServer-gc.log.0.current.zip, server.properties
>
>
> I have updated to release 1.0.1.
> I set up cluster which have four brokers.
>  you could find the server.properties in the attachment.
>  There are about 150 topics, and about total 4000 partitions, 
> ReplicationFactor is 2.
>  connctors are used to write/read data to/from brokers.
>  connecotr version is 0.10.1.
>  The average message size is 500B, and around 6 messages per seconds.
>  one of the broker keep report OOM, and can't handle request like:
> [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[
> {partition=16,fetch_offset=51198,max_bytes=60728640}
> ,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} 
> (kafka.server.KafkaApis)
>  java.lang.OutOfMemoryError: Java heap space
>      at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>      at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>      at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
>      at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
>      at scala.Option.map(Option.scala:146)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
>      at scala.Option.flatMap(Option.scala:171)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
>      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
>      at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
>      at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
>      at 
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
>      at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
>      at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
>      at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
>      at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
>      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
>      at kafka.server.KafkaApis.handle(KafkaApis.scala:99)
> and then lots of shrink ISR ( this broker is 1001).
> [2018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 
> broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
>  

[jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2018-04-09 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-6767:


 Summary: OffsetCheckpoint write assumes parent directory exists
 Key: KAFKA-6767
 URL: https://issues.apache.org/jira/browse/KAFKA-6767
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Steven Schlansker


We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
instance dies it is created from scratch, rather than reusing the existing 
RocksDB.)

We routinely see:
{code:java}
2018-04-09T19:14:35.004Z WARN <> 
[chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
java.io.FileNotFoundException: 
/mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:162)
at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
Inspecting the state store directory, I can indeed see that {{chat/0_11}} does 
not exist (although many other partitions do).

 

Looking at the OffsetCheckpoint write method, it seems to try to open a new 
checkpoint file without first ensuring that the parent directory exists.

 
{code:java}
    public void write(final Map offsets) throws 
IOException {
    // if there is no offsets, skip writing the file to save disk IOs
    if (offsets.isEmpty()) {
    return;
    }

    synchronized (lock) {
    // write to temp file and then swap with the existing file
    final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
 

Either the OffsetCheckpoint class should initialize the directories if needed, 
or some precondition of it being called should ensure that is the case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4650) Improve test coverage org.apache.kafka.streams.kstream.internals

2018-04-09 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430988#comment-16430988
 ] 

Matthias J. Sax commented on KAFKA-4650:


I agree that smaller PRs are easier to handle. Not sure if we need multiple 
tickets -- feel free to do a "partial PR" that tackles some of the mentioned 
classes.

> Improve test coverage org.apache.kafka.streams.kstream.internals
> 
>
> Key: KAFKA-4650
> URL: https://issues.apache.org/jira/browse/KAFKA-4650
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Priority: Minor
>  Labels: newbie
>
> Lots of classes have little or no coverage at all, i.e., 
> {{KTableAggregate.KTableAggregateValueGetter}}
> {{KTableKTableRightJoin.KTableKTableRightJoinValueGetterSupplier}}
> {{KStreamAggregate.KStreamAggregateValueGetter}}
> {{KStreamReduce.KStreamReduceValueGetter}}
> {{KStreamWindowReduce.new KTableValueGetterSupplier}}
> {{KTableAggregate.new KTableValueGetterSupplier}}
> {{KTableRepartitionMap.new KTableValueGetterSupplier}}
> {{KTableKTableRightJoin.KTableKTableRightJoinValueGetter}}
> {{KTableKTableLeftJoinValueGetter}}
> {{KStreamWindowReduce.KStreamWindowReduceValueGetter}}
> {{TimeWindow}}
> {{ChangedSerializer}}
> {{UnlimitedWindow}}
> {{WindowedDeserializer}}
> {{KStreamSessionWindowAggregate.KTableSessionWindowValueGetter}}
> {{KTableRepartitionMap}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE

2018-04-09 Thread Jagadesh Adireddi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadesh Adireddi reassigned KAFKA-6749:


Assignee: Jagadesh Adireddi

> TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
> --
>
> Key: KAFKA-6749
> URL: https://issues.apache.org/jira/browse/KAFKA-6749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Frederic Arno
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> Stream processing topologies which are configured to use {{EXACTLY_ONCE}} 
> processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests 
> usually crash with {{java.lang.IllegalStateException: MockProducer hasn't 
> been initialized for transactions}} within the second call to 
> {{TopologyTestDriver.pipeInput()}}, the first call works fine.
> Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass.
> This is a problem because it is expected that proper processor topologies can 
> be successfully tested using {{TopologyTestDriver}}, however 
> {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during 
> tests. To a developer, this usually means that there is something wrong with 
> their processor topologies.
> Kafka developpers can reproduce this by adding:
> {code:java}
> put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE);{code}
> to line 88 of TopologyTestDriverTest: 
> streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
> Originally [reported on the 
> ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6766) Kafka offset moved backward

2018-04-09 Thread Janmejay Baral (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430634#comment-16430634
 ] 

Janmejay Baral commented on KAFKA-6766:
---

I am using Kafka version 0.11.0.0

> Kafka offset moved backward
> ---
>
> Key: KAFKA-6766
> URL: https://issues.apache.org/jira/browse/KAFKA-6766
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
> Environment: 3 Noded Kafka Cluster
>Reporter: Janmejay Baral
>Priority: Major
>
> * While on putting load on Kafka topic , encountered with a problem where it 
> says "[2018-04-09 19:45:36,771] INFO [Group Metadata Manager on Broker 1]: 
> Group ingestionconsumers transitioned to Dead in generation 0 
> (kafka.coordinator.group.GroupMetadataManager)" in the server.log.
>  * Once I found the offsets have gone backward , I found this from the server 
> log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6766) Kafka offset moved backward

2018-04-09 Thread Janmejay Baral (JIRA)
Janmejay Baral created KAFKA-6766:
-

 Summary: Kafka offset moved backward
 Key: KAFKA-6766
 URL: https://issues.apache.org/jira/browse/KAFKA-6766
 Project: Kafka
  Issue Type: Bug
  Components: offset manager
 Environment: 3 Noded Kafka Cluster
Reporter: Janmejay Baral


* While on putting load on Kafka topic , encountered with a problem where it 
says "[2018-04-09 19:45:36,771] INFO [Group Metadata Manager on Broker 1]: 
Group ingestionconsumers transitioned to Dead in generation 0 
(kafka.coordinator.group.GroupMetadataManager)" in the server.log.
 * Once I found the offsets have gone backward , I found this from the server 
log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed

2018-04-09 Thread Veera (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430366#comment-16430366
 ] 

Veera commented on KAFKA-6052:
--

Hi Jason

Could you please confirm the release date of either 1.1.1. or 1.2.0 ? Many 
Thanks.

> Windows: Consumers not polling when isolation.level=read_committed 
> ---
>
> Key: KAFKA-6052
> URL: https://issues.apache.org/jira/browse/KAFKA-6052
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, producer 
>Affects Versions: 0.11.0.0, 1.0.1
> Environment: Windows 10. All processes running in embedded mode.
>Reporter: Ansel Zandegran
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: transactions, windows
> Fix For: 1.2.0, 1.1.1
>
> Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>   "localhost:9092,localhost:9093,localhost:9094");
> //props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
> props.put(ProducerConfig.ACKS_CONFIG, "all");
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put("transactional.id", "TID" + transactionId.incrementAndGet());
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
> Producer producer =
> new KafkaProducer<>(props,
> new StringSerializer(),
> new StringSerializer());
> Logger.log(this, "Initializing transaction...");
> producer.initTransactions();
> Logger.log(this, "Initializing done.");
> try {
>   Logger.log(this, "Begin transaction...");
>   producer.beginTransaction();
>   Logger.log(this, "Begin transaction done.");
>   Logger.log(this, "Sending events...");
>   producer.send(new ProducerRecord<>(topic,
>  event.getKey().toString(),
>  event.getValue().toString()));
>   Logger.log(this, "Sending events done.");
>   Logger.log(this, "Committing...");
>   producer.commitTransaction();
>   Logger.log(this, "Committing done.");
> } catch (ProducerFencedException | OutOfOrderSequenceException
> | AuthorizationException e) {
>   producer.close();
>   e.printStackTrace();
> } catch (KafkaException e) {
>   producer.abortTransaction();
>   e.printStackTrace();
> }
> producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>   Properties props = new Properties();
>   props.setProperty("broker.id", "" + i);
>   props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>   props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>   props.setProperty("num.partitions", "1");
>   props.setProperty("zookeeper.connect", "localhost:2181");
>   props.setProperty("zookeeper.connection.timeout.ms", "6000");
>   props.setProperty("min.insync.replicas", "2");
>   props.setProperty("offsets.topic.replication.factor", "2");
>   props.setProperty("offsets.topic.num.partitions", "1");
>   props.setProperty("transaction.state.log.num.partitions", "2");
>   props.setProperty("transaction.state.log.replication.factor", "2");
>   props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-04-09 Thread Ben (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430348#comment-16430348
 ] 

Ben commented on KAFKA-1194:


Note we also see this issue on Windows, but in slightly different 
circumstances. If we try to delete a topic then the broker crashes entirely 
with an error about not being able to delete files while they are in use. You 
then can't restart the broker until you delete all the log files manually first.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6765) Intermittent test failure in CustomQuotaCallbackTest

2018-04-09 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6765:
-

 Summary: Intermittent test failure in CustomQuotaCallbackTest
 Key: KAFKA-6765
 URL: https://issues.apache.org/jira/browse/KAFKA-6765
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.2.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 1.2.0


Exception stack trace:

{quote}

java.lang.NullPointerException at 
org.apache.kafka.common.metrics.stats.SampledStat.purgeObsoleteSamples(SampledStat.java:104)
 at 
org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:74) 
at org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:68) 
at kafka.api.QuotaTestClients$.metricValue(BaseQuotaTest.scala:163) at 
kafka.api.QuotaTestClients.produceUntilThrottled(BaseQuotaTest.scala:193) at 
kafka.api.CustomQuotaCallbackTest$GroupedUser.produceConsume(CustomQuotaCallbackTest.scala:272)
 at 
kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:146)
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4650) Improve test coverage org.apache.kafka.streams.kstream.internals

2018-04-09 Thread Jimin Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430327#comment-16430327
 ] 

Jimin Hsieh commented on KAFKA-4650:


Is it possible to split this issue into multiple smaller issues? It would be 
much easier for contributors to work on and committers to review. 

> Improve test coverage org.apache.kafka.streams.kstream.internals
> 
>
> Key: KAFKA-4650
> URL: https://issues.apache.org/jira/browse/KAFKA-4650
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Priority: Minor
>  Labels: newbie
>
> Lots of classes have little or no coverage at all, i.e., 
> {{KTableAggregate.KTableAggregateValueGetter}}
> {{KTableKTableRightJoin.KTableKTableRightJoinValueGetterSupplier}}
> {{KStreamAggregate.KStreamAggregateValueGetter}}
> {{KStreamReduce.KStreamReduceValueGetter}}
> {{KStreamWindowReduce.new KTableValueGetterSupplier}}
> {{KTableAggregate.new KTableValueGetterSupplier}}
> {{KTableRepartitionMap.new KTableValueGetterSupplier}}
> {{KTableKTableRightJoin.KTableKTableRightJoinValueGetter}}
> {{KTableKTableLeftJoinValueGetter}}
> {{KStreamWindowReduce.KStreamWindowReduceValueGetter}}
> {{TimeWindow}}
> {{ChangedSerializer}}
> {{UnlimitedWindow}}
> {{WindowedDeserializer}}
> {{KStreamSessionWindowAggregate.KTableSessionWindowValueGetter}}
> {{KTableRepartitionMap}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-09 Thread Valentino Proietti (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16430231#comment-16430231
 ] 

Valentino Proietti commented on KAFKA-6742:
---

Thank you [~mjsax] !

> TopologyTestDriver error when dealing with stores from GlobalKTable
> ---
>
> Key: KAFKA-6742
> URL: https://issues.apache.org/jira/browse/KAFKA-6742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Valentino Proietti
>Assignee: Valentino Proietti
>Priority: Minor
>
> {color:#ff}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map getAllStateStores() {
> //        final Map allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
>