[jira] [Resolved] (KAFKA-6577) Connect standalone SASL file source and sink test fails without explanation
[ https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6577. --- Resolution: Fixed Fix Version/s: 1.2.0 Issue resolved by pull request 4610 [https://github.com/apache/kafka/pull/4610] > Connect standalone SASL file source and sink test fails without explanation > --- > > Key: KAFKA-6577 > URL: https://issues.apache.org/jira/browse/KAFKA-6577 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 1.2.0, 1.1.0 > > > The > {{tests/kafkatest/tests/connect/connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink}} > test is failing with the SASL configuration without a sufficient > explanation. During the test, the Connect worker fails to start, but the > Connect log contains no useful information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
[ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6378. --- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4424 [https://github.com/apache/kafka/pull/4424] > NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper > returns null > -- > > Key: KAFKA-6378 > URL: https://issues.apache.org/jira/browse/KAFKA-6378 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andy Bryant >Priority: Major > Fix For: 1.1.0 > > > On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the > stream fails with a NullPointerException (see stacktrace below). On Kafka > 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with > the table value set to null. > The use-case for this is joining a stream to a table containing reference > data where the stream foreign key may be null. There is no straight-forward > workaround in this case with Kafka 1.0.0 without having to resort to either > generating a key that will never match or branching the stream for records > that don't have the foreign key. > Exception in thread "workshop-simple-example-client-StreamThread-1" > java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6412. --- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4372 [https://github.com/apache/kafka/pull/4372] > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
Damian Guy created KAFKA-6360: - Summary: RocksDB segments not removed when store is closed causes re-initialization to fail Key: KAFKA-6360 URL: https://issues.apache.org/jira/browse/KAFKA-6360 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Damian Guy Assignee: Damian Guy Priority: Blocker Fix For: 1.1.0 When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the {{Segments}} class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in: {code} [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-24: (org.apache.kafka.streams.processor.internals.ProcessorStateManager) org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6349) ConcurrentModificationException during streams state restoration
Damian Guy created KAFKA-6349: - Summary: ConcurrentModificationException during streams state restoration Key: KAFKA-6349 URL: https://issues.apache.org/jira/browse/KAFKA-6349 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Damian Guy Priority: Blocker Fix For: 1.1.0 Attachments: streams-error.log During application startup and state restoration a {{ConcurrentModificationException}} was thrown from {{AbstractStateManager}} {code} [2017-12-12 10:47:09,840] ERROR [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread) java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750) at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:74) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:229) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6259) Make KafkaStreams.cleanup() clean global state directory
Damian Guy created KAFKA-6259: - Summary: Make KafkaStreams.cleanup() clean global state directory Key: KAFKA-6259 URL: https://issues.apache.org/jira/browse/KAFKA-6259 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0, 1.0.1, 0.11.0.2 Reporter: Damian Guy We have {{KafkaStreams#cleanUp}} so that developers can remove all local state during development, i.e., so they can start from a clean slate. However, this presently doesn't cleanup the global state directory -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6186) RocksDB based WindowStore fail to create db file on Windows OS
[ https://issues.apache.org/jira/browse/KAFKA-6186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6186. --- Resolution: Duplicate > RocksDB based WindowStore fail to create db file on Windows OS > -- > > Key: KAFKA-6186 > URL: https://issues.apache.org/jira/browse/KAFKA-6186 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: Windows OS >Reporter: James Wu > > Code snippet just like below > ... > textLines.flatMapValues(value -> > Arrays.asList(pattern.split(value.toLowerCase(.groupBy((key, word) -> > word) > > .windowedBy(TimeWindows.of(1)).count(Materialized.as("Counts")); > ... > Run it on Windows, then the exception is throw as below > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > F:\tmp\kafka-streams\wordcount-lambda-example\1_0\Counts\Counts:151009920: > Invalid argument > at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.7.3.jar:na] > at org.rocksdb.RocksDB.open(RocksDB.java:231) ~[rocksdbjni-5.7.3.jar:na] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197) > ~[kafka-streams-1.0.0.jar:na] > ... 29 common frames omitted > Checked the code, I found the issue is caused by line 72 in > org.apache.kafka.streams.state.internals.Segments > String segmentName(final long segmentId) { > // previous format used - as a separator so if this changes in the > future > // then we should use something different. > return name + ":" + segmentId * segmentInterval; > } > "segmentName" is passed to RocksDB, RockDB will use it as file name to create > the DB file, as we known, the ":" cannot be part of file name in Windows OS. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6069) Streams metrics tagged incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-6069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6069. --- Resolution: Fixed Fix Version/s: 1.0.1 1.1.0 Issue resolved by pull request 4081 [https://github.com/apache/kafka/pull/4081] > Streams metrics tagged incorrectly > -- > > Key: KAFKA-6069 > URL: https://issues.apache.org/jira/browse/KAFKA-6069 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Tommy Becker >Assignee: Tommy Becker >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > KafkaStreams attempts to tag many (all?) of it's metrics with the client id. > But instead of retrieving the value from the config, it tags them with the > literal "client.id", as can be seen on > org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java:114 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6023) ThreadCache#sizeBytes() should check overflow
[ https://issues.apache.org/jira/browse/KAFKA-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6023. --- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4041 [https://github.com/apache/kafka/pull/4041] > ThreadCache#sizeBytes() should check overflow > - > > Key: KAFKA-6023 > URL: https://issues.apache.org/jira/browse/KAFKA-6023 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > Fix For: 1.1.0 > > > {code} > long sizeBytes() { > long sizeInBytes = 0; > for (final NamedCache namedCache : caches.values()) { > sizeInBytes += namedCache.sizeInBytes(); > } > return sizeInBytes; > } > {code} > The summation w.r.t. sizeInBytes may overflow. > Check similar to what is done in size() should be performed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4890) State directory being deleted when another thread holds the lock
[ https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-4890. --- Resolution: Duplicate > State directory being deleted when another thread holds the lock > > > Key: KAFKA-4890 > URL: https://issues.apache.org/jira/browse/KAFKA-4890 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy > Attachments: logs2.tar.gz, logs3.tar.gz, logs.tar.gz > > > Looks like a state directory is being cleaned up when another thread already > has the lock: > {code} > 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager > - task [0_6] Registering state store perGameScoreStore to its state manager > 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_6 for task 0_6 > 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > fireflyProd failed on partition assignment > org.apache.kafka.streams.errors.ProcessorStateException: Error while > executing put key > \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value > \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248) > at > org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65) > at > org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.rocksdb.RocksDBException: ` > at org.rocksdb.RocksDB.put(Native Method) > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at >
[jira] [Resolved] (KAFKA-5985) Mention the need to close store iterators
[ https://issues.apache.org/jira/browse/KAFKA-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5985. --- Resolution: Fixed Issue resolved by pull request 3994 [https://github.com/apache/kafka/pull/3994] > Mention the need to close store iterators > - > > Key: KAFKA-5985 > URL: https://issues.apache.org/jira/browse/KAFKA-5985 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 0.11.0.0 >Reporter: Stanislav Chizhov >Assignee: Bill Bejeck > Fix For: 1.0.0 > > > Store iterators should be closed in all/most of the cases, but currently it > is not consistently reflected in the documentation and javadocs. For instance > > https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_developer-guide_interactive-queries_custom-stores > does not mention the need to close an iterator and provide an example that > does not do that. > Some of the fetch methods do mention the need to close an iterator returned > (e.g. > https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.html#range(K,%20K)), > but others do not: > https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/state/ReadOnlyWindowStore.html#fetch(K,%20long,%20long) > It makes sense to: > - update javadoc for all store methods that do return iterators to reflect > that the iterator returned needs to be closed > - mention it in the documentation and to update related examples. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6001) Remove <Bytes, byte[]> from usages of Materialized in Streams
Damian Guy created KAFKA-6001: - Summary: Removefrom usages of Materialized in Streams Key: KAFKA-6001 URL: https://issues.apache.org/jira/browse/KAFKA-6001 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 We can remove ` ` from usages of `Materialized` in the DSL. This will make the api a little nicer to work with. ` ` is already enforced. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5986) Streams State Restoration never completes when logging is disabled
Damian Guy created KAFKA-5986: - Summary: Streams State Restoration never completes when logging is disabled Key: KAFKA-5986 URL: https://issues.apache.org/jira/browse/KAFKA-5986 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Damian Guy Assignee: Damian Guy Priority: Critical Fix For: 1.0.0, 0.11.0.2 When logging is disabled on a state store, the store restoration never completes. This is likely because there are no changelogs, but more investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5765) Move merge() from StreamsBuilder to KStream
[ https://issues.apache.org/jira/browse/KAFKA-5765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5765. --- Resolution: Fixed Fix Version/s: (was: 1.1.0) 1.0.0 Issue resolved by pull request 3916 [https://github.com/apache/kafka/pull/3916] > Move merge() from StreamsBuilder to KStream > --- > > Key: KAFKA-5765 > URL: https://issues.apache.org/jira/browse/KAFKA-5765 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Richard Yu > Labels: needs-kip, newbie > Fix For: 1.0.0 > > Attachments: 5765.v1.patch > > > Merging multiple {{KStream}} is done via {{StreamsBuilder#merge()}} (formally > {{KStreamBuilder#merge()}}). This is quite unnatural and should be done via > {{KStream#merge()}}. > We need a KIP as we add a new method to a public {{KStreams}} API and > deprecate the old {{merge()}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5489) Failing test: InternalTopicIntegrationTest.shouldCompactTopicsForStateChangelogs
[ https://issues.apache.org/jira/browse/KAFKA-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5489. --- Resolution: Cannot Reproduce Closing this as i haven't seen it fail in a while and i'm unable to reproduce it. We can re-open if it occurs again > Failing test: > InternalTopicIntegrationTest.shouldCompactTopicsForStateChangelogs > > > Key: KAFKA-5489 > URL: https://issues.apache.org/jira/browse/KAFKA-5489 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Damian Guy > Labels: test > > Test failed with > {noformat} > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.kafka.streams.integration.InternalTopicIntegrationTest.shouldCompactTopicsForStateChangelogs(InternalTopicIntegrationTest.java:173) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5956) StreamBuilder#table and StreamsBuilder#globalTable should use serdes from Materialized
[ https://issues.apache.org/jira/browse/KAFKA-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5956. --- Resolution: Fixed Issue resolved by pull request 3936 [https://github.com/apache/kafka/pull/3936] > StreamBuilder#table and StreamsBuilder#globalTable should use serdes from > Materialized > -- > > Key: KAFKA-5956 > URL: https://issues.apache.org/jira/browse/KAFKA-5956 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > The new overloads {{StreamBuilder.table(String, Materialized)}} and > {{StreamsBuilder.globalTable(String, Materialized)}} need to set the serdes > from {{Materialized}} on the internal {{Consumed}} instance that is created, > otherwise the defaults will be used and may result in serialization errors -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5956) StreamBuilder#table and StreamsBuilder#globalTable should use serdes from Materialized
Damian Guy created KAFKA-5956: - Summary: StreamBuilder#table and StreamsBuilder#globalTable should use serdes from Materialized Key: KAFKA-5956 URL: https://issues.apache.org/jira/browse/KAFKA-5956 Project: Kafka Issue Type: Bug Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 The new overloads {{StreamBuilder.table(String, Materialized)}} and {{StreamsBuilder.globalTable(String, Materialized)}} need to set the serdes from {{Materialized}} on the internal {{Consumed}} instance that is created, otherwise the defaults will be used and may result in serialization errors -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5931) Deprecate KTable#to and KTable#through
[ https://issues.apache.org/jira/browse/KAFKA-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5931. --- Resolution: Fixed Issue resolved by pull request 3903 [https://github.com/apache/kafka/pull/3903] > Deprecate KTable#to and KTable#through > -- > > Key: KAFKA-5931 > URL: https://issues.apache.org/jira/browse/KAFKA-5931 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > {{KTable#to}} and {{KTable#through}} should be deprecated in favour of using > {{KTable#toStream}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5939) Add a dryrun option to release.py
Damian Guy created KAFKA-5939: - Summary: Add a dryrun option to release.py Key: KAFKA-5939 URL: https://issues.apache.org/jira/browse/KAFKA-5939 Project: Kafka Issue Type: Bug Components: tools Reporter: Damian Guy It would be great to add a `dryrun` feature to `release.py` so that it can be used to test changes to the scripts etc. At the moment you need to make sure all JIRAs are closed for the release, have no uncommited changes etc, which is a bit of a hassle when you just want to test a change you've made to the script. There may be other things that need to be skipped, too -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5931) Deprecate KTable#to and KTable#through
Damian Guy created KAFKA-5931: - Summary: Deprecate KTable#to and KTable#through Key: KAFKA-5931 URL: https://issues.apache.org/jira/browse/KAFKA-5931 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 {{KTable#to}} and {{KTable#through}} should be deprecated in favour of using {{KTable#toStream}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5873) Add Materialized overloads to StreamBuilder
[ https://issues.apache.org/jira/browse/KAFKA-5873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5873. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3837 [https://github.com/apache/kafka/pull/3837] > Add Materialized overloads to StreamBuilder > --- > > Key: KAFKA-5873 > URL: https://issues.apache.org/jira/browse/KAFKA-5873 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the overloads from KIP-182 that use {{Materialized}} to {{StreamsBuilder}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5922) Add SessionWindowedKStream
Damian Guy created KAFKA-5922: - Summary: Add SessionWindowedKStream Key: KAFKA-5922 URL: https://issues.apache.org/jira/browse/KAFKA-5922 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add SessionWindowedKStream interface and implementation -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5921) Add Materialized overloads to WindowedKStream
Damian Guy created KAFKA-5921: - Summary: Add Materialized overloads to WindowedKStream Key: KAFKA-5921 URL: https://issues.apache.org/jira/browse/KAFKA-5921 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add the {{Materialized}} overloads to {{WindowedKStream} - KIP-182 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5754) Refactor Streams to use LogContext
[ https://issues.apache.org/jira/browse/KAFKA-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5754. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3727 [https://github.com/apache/kafka/pull/3727] > Refactor Streams to use LogContext > -- > > Key: KAFKA-5754 > URL: https://issues.apache.org/jira/browse/KAFKA-5754 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jason Gustafson >Assignee: Umesh Chaudhary > Labels: newbie > Fix For: 1.0.0 > > > We added a {{LogContext}} object which automatically adds a log prefix to > every message written by loggers constructed from it (much like the Logging > mixin available in the server code). We use this in the consumer to ensure > that messages always contain the consumer group and client ids, which is very > helpful when multiple consumers are run on the same instance. Kafka Streams > requires similar contextual logging by including the prefix manually in each > log message. It would be better to take advantage of the new {{LogContext}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5908) CompositeReadOnlyWindowStore range fetch doesn't return all values when fetching with different start and end times
[ https://issues.apache.org/jira/browse/KAFKA-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5908. --- Resolution: Fixed Issue resolved by pull request 3868 [https://github.com/apache/kafka/pull/3868] > CompositeReadOnlyWindowStore range fetch doesn't return all values when > fetching with different start and end times > --- > > Key: KAFKA-5908 > URL: https://issues.apache.org/jira/browse/KAFKA-5908 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > The {{NextIteratorFunction}} in {{CompositeReadOnlyWindowStore}} is > incorrectly using the {{timeFrom}} as the {{timeTo}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5908) CompositeReadOnlyWindowStore range fetch doesn't return all values when fetching with different start and end times
Damian Guy created KAFKA-5908: - Summary: CompositeReadOnlyWindowStore range fetch doesn't return all values when fetching with different start and end times Key: KAFKA-5908 URL: https://issues.apache.org/jira/browse/KAFKA-5908 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 The {{NextIteratorFunction}} in {{CompositeReadOnlyWindowStore}} is incorrectly using the {{timeFrom}} as the {{timeTo}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5655) Add new API methods to KGroupedTable
[ https://issues.apache.org/jira/browse/KAFKA-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5655. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3829 [https://github.com/apache/kafka/pull/3829] > Add new API methods to KGroupedTable > > > Key: KAFKA-5655 > URL: https://issues.apache.org/jira/browse/KAFKA-5655 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5873) Add Materialized overloads to StreamBuilder
Damian Guy created KAFKA-5873: - Summary: Add Materialized overloads to StreamBuilder Key: KAFKA-5873 URL: https://issues.apache.org/jira/browse/KAFKA-5873 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Add the overloads from KIP-182 that use {{Materialized}} to {{StreamsBuilder}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5853) Add WindowedKStream interface and implemenation
[ https://issues.apache.org/jira/browse/KAFKA-5853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5853. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3809 [https://github.com/apache/kafka/pull/3809] > Add WindowedKStream interface and implemenation > --- > > Key: KAFKA-5853 > URL: https://issues.apache.org/jira/browse/KAFKA-5853 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the {{WindowedKStream}} interface and implementation of methods that > don't require {{Materialized}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5832) Add Consumed class and overloads to StreamBuilder
[ https://issues.apache.org/jira/browse/KAFKA-5832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5832. --- Resolution: Fixed Issue resolved by pull request 3784 [https://github.com/apache/kafka/pull/3784] > Add Consumed class and overloads to StreamBuilder > - > > Key: KAFKA-5832 > URL: https://issues.apache.org/jira/browse/KAFKA-5832 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the {{Consumed}} class and the relevant overloads from {{StreamBuilder}} > (KIP-182) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5853) Add WindowedKStream interface and implemenation
Damian Guy created KAFKA-5853: - Summary: Add WindowedKStream interface and implemenation Key: KAFKA-5853 URL: https://issues.apache.org/jira/browse/KAFKA-5853 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Add the {{WindowedKStream}} interface and implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5852) Add filter, filterNot, mapValues and Materialized to KTable
Damian Guy created KAFKA-5852: - Summary: Add filter, filterNot, mapValues and Materialized to KTable Key: KAFKA-5852 URL: https://issues.apache.org/jira/browse/KAFKA-5852 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add overloads of {{filter}}, {{filterNot}}, {{mapValues}} that take {{Materialized}} as a param to KTable. Deprecate overloads using {{storeName}} and {{storeSupplier}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5844) Add groupBy(KeyValueMapper, Serialized) to KTable
[ https://issues.apache.org/jira/browse/KAFKA-5844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5844. --- Resolution: Fixed Issue resolved by pull request 3802 [https://github.com/apache/kafka/pull/3802] > Add groupBy(KeyValueMapper, Serialized) to KTable > - > > Key: KAFKA-5844 > URL: https://issues.apache.org/jira/browse/KAFKA-5844 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > part of KIP-182 > add {{KTable#groupBy(KeyValueMapper, Serialized)}} and deprecate the overload > with {{Serde}} params -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5844) Add groupBy(KeyValueMapper, Serialized) to KTable
Damian Guy created KAFKA-5844: - Summary: Add groupBy(KeyValueMapper, Serialized) to KTable Key: KAFKA-5844 URL: https://issues.apache.org/jira/browse/KAFKA-5844 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 part of KIP-182 add {{KTable#groupBy(KeyValueMapper, Serialized)}} and deprecate the overload with {{Serde}} params -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5832) Add Consumed class and overloads to StreamBuilder
Damian Guy created KAFKA-5832: - Summary: Add Consumed class and overloads to StreamBuilder Key: KAFKA-5832 URL: https://issues.apache.org/jira/browse/KAFKA-5832 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add the {{Consumed}} class and the relevant overloads from {{StreamBuilder}} (KIP-182) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5819) Add Joined class and relevant KStream join overloads
Damian Guy created KAFKA-5819: - Summary: Add Joined class and relevant KStream join overloads Key: KAFKA-5819 URL: https://issues.apache.org/jira/browse/KAFKA-5819 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add the {{Joined}} class as defined in KIP-182 and the following overloads to {{KStream}} {code}KStream join(final KStream other, final ValueJoiner joiner, final JoinWindows windows, final Joined options); KStream join(final KTable other, final ValueJoiner joiner, final Joined options); KStream leftJoin(final KStream other, final ValueJoiner joiner, final JoinWindows windows, final Joined options); KStream leftJoin(final KTable other, final ValueJoiner joiner, final Joined options); KStream outerJoin(final KStream other, final ValueJoiner joiner, final JoinWindows windows, final Joined options); {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5817) Add Serialized class and KStream groupBy and groupByKey overloads
Damian Guy created KAFKA-5817: - Summary: Add Serialized class and KStream groupBy and groupByKey overloads Key: KAFKA-5817 URL: https://issues.apache.org/jira/browse/KAFKA-5817 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Add the following classes and methods to {{KStream}} {{KGroupedStreamgroupByKey(final Serialized serialized)}} {{ KGroupedStream groupBy(final KeyValueMapper selector, Serialized serialized)}} {code} public class Serialized { public static Serialized with(final Serde keySerde, final Serde valueSerde) public Serialized withKeySerde(final Serde keySerde) public Serialized withValueSerde(final Serde valueSerde) } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5816) Add Produced class and new to and through overloads to KStream
Damian Guy created KAFKA-5816: - Summary: Add Produced class and new to and through overloads to KStream Key: KAFKA-5816 URL: https://issues.apache.org/jira/browse/KAFKA-5816 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Add the {{Produced}} and {{KStream}} overloads that use it: {{KStream#to(String, Produced)}} {{KStream#through(String, Produced)}} Deprecate all other {{to}} and {{through}} methods accept the single param methods that take a {{topic}} param -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5815) Add Printed class and KStream#print(Printed)
Damian Guy created KAFKA-5815: - Summary: Add Printed class and KStream#print(Printed) Key: KAFKA-5815 URL: https://issues.apache.org/jira/browse/KAFKA-5815 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Add Printed class and KStream#print(Printed) deprecate all other print and writeAsText methods -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5769) Transient test failure org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache
[ https://issues.apache.org/jira/browse/KAFKA-5769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5769. --- Resolution: Duplicate > Transient test failure > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache > --- > > Key: KAFKA-5769 > URL: https://issues.apache.org/jira/browse/KAFKA-5769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Damian Guy >Assignee: Damian Guy > > This has been failing in a few builds: > {code} > java.lang.AssertionError: Condition not met within timeout 6. Expecting 5 > records from topic map-one-join-output-1 while only received 0: [] > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:275) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:201) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.receiveMessages(KStreamRepartitionJoinTest.java:375) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyCorrectOutput(KStreamRepartitionJoinTest.java:296) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyRepartitionOnJoinOperations(KStreamRepartitionJoinTest.java:141) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache(KStreamRepartitionJoinTest.java:119) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-4835. --- Resolution: Duplicate > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTableloggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (KAFKA-4835) Allow users control over repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reopened KAFKA-4835: --- > Allow users control over repartitioning > --- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Michal Borowiecki > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTableloggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5804) ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the log
[ https://issues.apache.org/jira/browse/KAFKA-5804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5804. --- Resolution: Fixed Issue resolved by pull request 3754 [https://github.com/apache/kafka/pull/3754] > ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the > log > > > Key: KAFKA-5804 > URL: https://issues.apache.org/jira/browse/KAFKA-5804 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > The {{ChangeLoggingWindowBytesStore}} needs to have the same duplicate > retaining logic as {{RocksDBWindowStore}} otherwise data loss may occur when > performing windowed joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5804) ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the log
Damian Guy created KAFKA-5804: - Summary: ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the log Key: KAFKA-5804 URL: https://issues.apache.org/jira/browse/KAFKA-5804 Project: Kafka Issue Type: Bug Reporter: Damian Guy Assignee: Damian Guy The {{ChangeLoggingWindowBytesStore}} needs to have the same duplicate retaining logic as {{RocksDBWindowStore}} otherwise data loss may occur when performing windowed joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5771) org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order
[ https://issues.apache.org/jira/browse/KAFKA-5771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5771. --- Resolution: Fixed Fix Version/s: 0.11.0.1 1.0.0 Issue resolved by pull request 3737 [https://github.com/apache/kafka/pull/3737] > org.apache.kafka.streams.state.internals.Segments#segments method returns > incorrect results when segments were added out of order > - > > Key: KAFKA-5771 > URL: https://issues.apache.org/jira/browse/KAFKA-5771 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Alexander Radzishevsky > Fix For: 1.0.0, 0.11.0.1 > > > following unit test in org.apache.kafka.streams.state.internals.SegmentsTest > will fail > {code:title=org.apache.kafka.streams.state.internals.SegmentsTest.java|borderStyle=solid} > @Test > public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception > { > segments.getOrCreateSegment(4, context); > segments.getOrCreateSegment(2, context); > segments.getOrCreateSegment(0, context); > segments.getOrCreateSegment(1, context); > segments.getOrCreateSegment(3, context); > final List segments = this.segments.segments(0, 2 * 60 * > 1000); > assertEquals(3, segments.size()); > assertEquals(0, segments.get(0).id); > assertEquals(1, segments.get(1).id); > assertEquals(2, segments.get(2).id); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5787) StoreChangeLogReader needs to restore partitions that were added post initialization
Damian Guy created KAFKA-5787: - Summary: StoreChangeLogReader needs to restore partitions that were added post initialization Key: KAFKA-5787 URL: https://issues.apache.org/jira/browse/KAFKA-5787 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.1, 1.0.0 Reporter: Damian Guy Assignee: Damian Guy Priority: Blocker Investigation of {{KStreamRepartitionJoinTest}} failures uncovered this bug. If a task fails during initialization due to a {{LockException}}, its changelog partitions are not immediately added to the {{StoreChangelogReader}} as the thread doesn't hold the lock. However {{StoreChangelogReader#restore}} will be called and it sets the initialized flag. On a subsequent successfull call to initialize the new tasks the partitions are added to the {{StoreChangelogReader}}, however as it is already initialized these new partitions will never be restored. So the task will remain in a non-running state forever -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5769) Transient test failure org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache
Damian Guy created KAFKA-5769: - Summary: Transient test failure org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache Key: KAFKA-5769 URL: https://issues.apache.org/jira/browse/KAFKA-5769 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Damian Guy Assignee: Damian Guy This has been failing in a few builds: {code} java.lang.AssertionError: Condition not met within timeout 6. Expecting 5 records from topic map-one-join-output-1 while only received 0: [] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:275) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:201) at org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.receiveMessages(KStreamRepartitionJoinTest.java:375) at org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyCorrectOutput(KStreamRepartitionJoinTest.java:296) at org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyRepartitionOnJoinOperations(KStreamRepartitionJoinTest.java:141) at org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache(KStreamRepartitionJoinTest.java:119) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5689) Refactor WindowStore hierarchy so that Metered Store is the outermost store
[ https://issues.apache.org/jira/browse/KAFKA-5689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5689. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3692 [https://github.com/apache/kafka/pull/3692] > Refactor WindowStore hierarchy so that Metered Store is the outermost store > > > Key: KAFKA-5689 > URL: https://issues.apache.org/jira/browse/KAFKA-5689 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > MeteredWinowStore is currently not the outermost store. Further it needs to > have the inner store asto allow easy plugability of custom > storage engines. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5725) Additional failure testing for streams with bouncing brokers
[ https://issues.apache.org/jira/browse/KAFKA-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5725. --- Resolution: Fixed > Additional failure testing for streams with bouncing brokers > > > Key: KAFKA-5725 > URL: https://issues.apache.org/jira/browse/KAFKA-5725 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Eno Thereska >Assignee: Eno Thereska > Fix For: 0.11.0.2 > > > We have tests/streams/streams_broker_bounce_test.py that tests streams' > robustness when some brokers quit or are terminated. We do not have coverage > for transient failures, such as when brokers come back. Add such tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5749) Refactor SessionStore hierarchy
Damian Guy created KAFKA-5749: - Summary: Refactor SessionStore hierarchy Key: KAFKA-5749 URL: https://issues.apache.org/jira/browse/KAFKA-5749 Project: Kafka Issue Type: Sub-task Components: streams Affects Versions: 1.0.0 Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 In order to support bytes store we need to create a MeteredSessionStore and ChangeloggingSessionStore. We then need to refactor the current SessionStore implementations to use this. All inner stores should by of type-- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5242) add max_number _of_retries to exponential backoff strategy
[ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5242. --- Resolution: Won't Do This has been fixed by KAFKA-5152 which has already been merged to 0.11.0.1 already > add max_number _of_retries to exponential backoff strategy > -- > > Key: KAFKA-5242 > URL: https://issues.apache.org/jira/browse/KAFKA-5242 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 0.10.2.2, 0.11.0.1 > > Attachments: clio_170511.log > > > From time to time, during relabance we are getting a lot of exceptions saying > {code} > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the > state directory: /app/db/clio/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > {code} > (see attached logfile) > It was actually problem on our side - we ran startStreams() twice and > therefore we had two threads touching the same folder structure. > But what I've noticed, the backoff strategy in > StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after > 20 iterations it takes 6hours until the next attempt to start a task. > I've noticed latest code contains check for rebalanceTimeoutMs, but that > still does not solve the problem especially in case > MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka > streams just hangs up indefinitely. > I would personally make that backoffstrategy a bit more configurable with a > number of retries that if it exceed a configured value it propagates the > exception as any other exception to custom client exception handler. > (I can provide a patch) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5440) Kafka Streams report state RUNNING even if all threads are dead
[ https://issues.apache.org/jira/browse/KAFKA-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5440. --- Resolution: Duplicate > Kafka Streams report state RUNNING even if all threads are dead > --- > > Key: KAFKA-5440 > URL: https://issues.apache.org/jira/browse/KAFKA-5440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Matthias J. Sax > Fix For: 0.11.0.1, 1.0.0 > > > From the mailing list: > {quote} > Hi All, > We recently implemented a health check for a Kafka Streams based application. > The health check is simply checking the state of Kafka Streams by calling > KafkaStreams.state(). It reports healthy if it’s not in PENDING_SHUTDOWN or > NOT_RUNNING states. > We truly appreciate having the possibility to easily check the state of Kafka > Streams but to our surprise we noticed that KafkaStreams.state() returns > RUNNING even though all StreamThreads has crashed and reached NOT_RUNNING > state. Is this intended behaviour or is it a bug? Semantically it seems weird > to me that KafkaStreams would say it’s RUNNING when it is in fact not > consuming anything since all underlying working threads has crashed. > If this is intended behaviour I would appreciate an explanation of why that > is the case. Also in that case, how could I determine if the consumption from > Kafka hasn’t crashed? > If this is not intended behaviour, how fast could I expect it to be fixed? I > wouldn’t mind fixing it myself but I’m not sure if this is considered trivial > or big enough to require a JIRA. Also, if I would implement a fix I’d like > your input on what would be a reasonable solution. By just inspecting to code > I have an idea but I’m not sure I understand all the implication so I’d be > happy to hear your thoughts first. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4643) Improve test coverage of StreamsKafkaClient
[ https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-4643. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3663 [https://github.com/apache/kafka/pull/3663] > Improve test coverage of StreamsKafkaClient > --- > > Key: KAFKA-4643 > URL: https://issues.apache.org/jira/browse/KAFKA-4643 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy > Fix For: 1.0.0 > > > Exception paths not tested. > {{getTopicMetadata}} not tested -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5717) [streams] 'null' values in state stores
[ https://issues.apache.org/jira/browse/KAFKA-5717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5717. --- Resolution: Fixed Fix Version/s: 0.11.0.1 1.0.0 Issue resolved by pull request 3650 [https://github.com/apache/kafka/pull/3650] > [streams] 'null' values in state stores > --- > > Key: KAFKA-5717 > URL: https://issues.apache.org/jira/browse/KAFKA-5717 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Bart Vercammen >Assignee: Damian Guy > Fix For: 1.0.0, 0.11.0.1 > > > When restoring the state on an in-memory KeyValue store (at startup of the > Kafka Streams application), the _deleted_ values are put in the store as > _key_ with _value_ {{null}} instead of being removed from the store. > (this happens when the underlying kafka topic segment did not get compacted > yet) > After some digging I came across this in {{InMemoryKeyValueStore}}: > {code} > public synchronized void put(K key, V value) { > this.map.put(key, value); > } > {code} > I would assume this implementation misses the check on {{value}} being > {{null}} to *delete* the entry instead of just storing it. > In the RocksDB implementation it is done correctly: > {code} > if (rawValue == null) { > try { > db.delete(wOptions, rawKey); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5702) Refactor StreamThread to separate concerns and enable better testability
Damian Guy created KAFKA-5702: - Summary: Refactor StreamThread to separate concerns and enable better testability Key: KAFKA-5702 URL: https://issues.apache.org/jira/browse/KAFKA-5702 Project: Kafka Issue Type: Improvement Components: streams Reporter: Damian Guy Assignee: Damian Guy {{StreamThread}} does a lot of stuff, i.e., managing and creating tasks, getting data from consumers, updating standby tasks, punctuating, rebalancing etc. With the current design it is extremely hard to reason about and is quite tightly coupled. We need to start to tease out some of the separate concerns from StreamThread, ie, TaskManager, RebalanceListener etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5689) Refactor WindowStore hierarchy so that Metered Store is the outermost store
Damian Guy created KAFKA-5689: - Summary: Refactor WindowStore hierarchy so that Metered Store is the outermost store Key: KAFKA-5689 URL: https://issues.apache.org/jira/browse/KAFKA-5689 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy MeteredWinowStore is currently not the outermost store. Further it needs to have the inner store asto allow easy plugability of custom storage engines. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5154) Kafka Streams throws NPE during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5154. --- Resolution: Fixed > Kafka Streams throws NPE during rebalance > - > > Key: KAFKA-5154 > URL: https://issues.apache.org/jira/browse/KAFKA-5154 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Assignee: Damian Guy > Attachments: 5154_problem.log, clio_afa596e9b809.gz, clio_reduced.gz, > clio.txt.gz > > > please see attached log, Kafka streams throws NullPointerException during > rebalance, which is caught by our custom exception handler > {noformat} > 2017-04-30T17:44:17,675 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T17:44:27,395 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T17:44:27,941 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, > poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] > for group hades > 2017-04-30T17:44:27,947 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:48,468 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:53,628 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:09,587 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:11,961 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @375 - Successfully joined group hades with generation 99 > 2017-04-30T17:45:13,126 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete() > @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T17:46:37,254 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T18:04:25,993 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T18:04:29,401 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T18:05:10,877 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-05-01T00:01:55,707 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-05-01T00:01:59,027 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-05-01T00:01:59,031 ERROR StreamThread-1 > org.apache.kafka.streams.processor.internals.StreamThread.run() @376 - > stream-thread [StreamThread-1] Streams application error during processing: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > 2017-05-01T00:02:00,038 INFO StreamThread-1 > org.apache.kafka.clients.producer.KafkaProducer.close() @689 - Closing the > Kafka
[jira] [Created] (KAFKA-5680) Don't materliaze physical state stores in KTable filter/map etc operations
Damian Guy created KAFKA-5680: - Summary: Don't materliaze physical state stores in KTable filter/map etc operations Key: KAFKA-5680 URL: https://issues.apache.org/jira/browse/KAFKA-5680 Project: Kafka Issue Type: Bug Reporter: Damian Guy Presently, for IQ, we will materialize physical state stores for {{KTable#filter}} {{KTable#mapValues}} etc operations if the user provides a {{queryableStoreName}}. This results in changelog topics, memory, disk space that we can avoid by providing a view on the original state store. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5673) Refactor KeyValueStore hierarchy so that MeteredKeyValueStore is the outermost store
Damian Guy created KAFKA-5673: - Summary: Refactor KeyValueStore hierarchy so that MeteredKeyValueStore is the outermost store Key: KAFKA-5673 URL: https://issues.apache.org/jira/browse/KAFKA-5673 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy MeteredKeyValueStore is currently not the outermost store. Further it needs to have the inner store as {{}} to allow easy plugability of custom storage engines. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5672) Move measureLatencyNs from StreamsMetricsImpl to StreamsMetrics
Damian Guy created KAFKA-5672: - Summary: Move measureLatencyNs from StreamsMetricsImpl to StreamsMetrics Key: KAFKA-5672 URL: https://issues.apache.org/jira/browse/KAFKA-5672 Project: Kafka Issue Type: Bug Reporter: Damian Guy StreamsMetricsImpl currently has the method {{measureLatencyNs}} but it is not on {{StreamsMetrics} - this should be moved to the interface so we can stop depending on the impl. Further, the {{Runnable}} argument passed to {{measureLatencyNs}} should be changed to some functional interface that can also return a value. As this is a public API change it will require a KIP -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5549) Explain that `client.id` is just used as a prefix within Streams
[ https://issues.apache.org/jira/browse/KAFKA-5549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5549. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3544 [https://github.com/apache/kafka/pull/3544] > Explain that `client.id` is just used as a prefix within Streams > > > Key: KAFKA-5549 > URL: https://issues.apache.org/jira/browse/KAFKA-5549 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Trivial > Labels: beginner, newbie > Fix For: 1.0.0 > > > We should explain, that {{client.id}} is used as a prefix for internal > consumer, producer, and restore-consumer and not reuse > {{CommonClientConfigs.CLIENT_ID_DOC}} within {{StreamsConfig}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5655) Add new API methods to KGroupedTable
Damian Guy created KAFKA-5655: - Summary: Add new API methods to KGroupedTable Key: KAFKA-5655 URL: https://issues.apache.org/jira/browse/KAFKA-5655 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5653) Add new API methods to KTable
Damian Guy created KAFKA-5653: - Summary: Add new API methods to KTable Key: KAFKA-5653 URL: https://issues.apache.org/jira/browse/KAFKA-5653 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5654) Add new API methods to KGroupedStream
Damian Guy created KAFKA-5654: - Summary: Add new API methods to KGroupedStream Key: KAFKA-5654 URL: https://issues.apache.org/jira/browse/KAFKA-5654 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Assignee: Damian Guy Placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5652) Add new api methods to KStream
Damian Guy created KAFKA-5652: - Summary: Add new api methods to KStream Key: KAFKA-5652 URL: https://issues.apache.org/jira/browse/KAFKA-5652 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Assignee: Damian Guy Add new methods from KIP-182 to {{KStream}} until finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5651) KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines
Damian Guy created KAFKA-5651: - Summary: KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines Key: KAFKA-5651 URL: https://issues.apache.org/jira/browse/KAFKA-5651 Project: Kafka Issue Type: New Feature Reporter: Damian Guy Assignee: Damian Guy -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5650) Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)
Damian Guy created KAFKA-5650: - Summary: Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182) Key: KAFKA-5650 URL: https://issues.apache.org/jira/browse/KAFKA-5650 Project: Kafka Issue Type: Bug Reporter: Damian Guy Assignee: Damian Guy As per KIP-182: A new interface will be added: {code} /** * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public interface StateStoreBuilder { StateStoreBuilder withCachingEnabled(); StateStoreBuilder withCachingDisabled(); StateStoreBuilder withLoggingEnabled(Mapconfig); StateStoreBuilder withLoggingDisabled(); T build(); } {code} This interface will be used to wrap stores with caching, logging etc. Additionally some convenience methods on the {{Stores}} class: {code} public static StateStoreSupplier > persistentKeyValueStore(final String name, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier > inMemoryKeyValueStore(final String name, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier > lruMap(final String name, final int capacity, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier > persistentWindowStore(final String name, final Windows windows, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier > persistentSessionStore(final String name, final SessionWindows windows, final Serde keySerde, final Serde valueSerde) /** * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public StateStoreBuilder > windowStoreBuilder(final StateStoreSupplier > supplier) public StateStoreBuilder > keyValueStoreBuilder(final StateStoreSupplier > supplier) public StateStoreBuilder > sessionStoreBuilder(final StateStoreSupplier > supplier) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-3741) Allow setting of default topic configs via StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-3741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-3741. --- Resolution: Fixed Fix Version/s: 0.11.1.0 Issue resolved by pull request 3459 [https://github.com/apache/kafka/pull/3459] > Allow setting of default topic configs via StreamsConfig > > > Key: KAFKA-3741 > URL: https://issues.apache.org/jira/browse/KAFKA-3741 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Roger Hoover >Assignee: Damian Guy > Labels: api > Fix For: 0.11.1.0 > > > Kafka Streams currently allows you to specify a replication factor for > changelog and repartition topics that it creates. It should also allow you > to specify any other TopicConfig. These should be used as defaults when > creating Internal topics. The defaults should be overridden by any configs > provided by the StateStoreSuppliers etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5157) Options for handling corrupt data during deserialization
[ https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5157. --- Resolution: Fixed Issue resolved by pull request 3423 [https://github.com/apache/kafka/pull/3423] > Options for handling corrupt data during deserialization > > > Key: KAFKA-5157 > URL: https://issues.apache.org/jira/browse/KAFKA-5157 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Eno Thereska > Labels: user-experience > Fix For: 0.11.1.0 > > > When there is a bad formatted data in the source topics, deserialization will > throw a runtime exception all the way to the users. And since deserialization > happens before it was ever processed at the beginning of the topology, today > there is no ways to handle such errors on the user-app level. > We should consider allowing users to handle such "poison pills" in a > customizable way. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks
Damian Guy created KAFKA-5578: - Summary: Streams Task Assignor should consider the staleness of state directories when allocating tasks Key: KAFKA-5578 URL: https://issues.apache.org/jira/browse/KAFKA-5578 Project: Kafka Issue Type: Bug Reporter: Damian Guy During task assignment we use the presence of a state directory to assign precedence to which instances should be assigned the task. We first chose previous active tasks, but then fall back to the existence of a state dir. Unfortunately we don't take into account the recency of the data from the available state dirs. So in the case where a task has run on many instances, it may be that we chose an instance that has relatively old data. When doing task assignment we should take into consideration the age of the data in the state dirs. We could use the data from the checkpoint files to determine which instance is most up-to-date and attempt to assign accordingly (obviously making sure that tasks are still balanced across available instances) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5566) Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied
[ https://issues.apache.org/jira/browse/KAFKA-5566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5566. --- Resolution: Fixed Fix Version/s: 0.11.0.1 0.11.1.0 Issue resolved by pull request 3500 [https://github.com/apache/kafka/pull/3500] > Instable test QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied > - > > Key: KAFKA-5566 > URL: https://issues.apache.org/jira/browse/KAFKA-5566 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Eno Thereska > Fix For: 0.11.1.0, 0.11.0.1 > > > This test failed about 4 times in the last 24h. Always the same stack trace > so far: > {noformat} > java.lang.AssertionError: Condition not met within timeout 3. wait for > agg to be '123' > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274) > at > org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowToQueryAfterThreadDied(QueryableStateIntegrationTest.java:793) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5562) Do streams state directory cleanup on a single thread
Damian Guy created KAFKA-5562: - Summary: Do streams state directory cleanup on a single thread Key: KAFKA-5562 URL: https://issues.apache.org/jira/browse/KAFKA-5562 Project: Kafka Issue Type: Bug Reporter: Damian Guy Assignee: Damian Guy Currently in streams we clean up old state directories every so often (as defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the cleanup, which is both unnecessary and can potentially lead to race conditions. It would be better to perform the state cleanup on a single thread and only when the {{KafkaStreams}} instance is in a running state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5508) Documentation for altering topics
[ https://issues.apache.org/jira/browse/KAFKA-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5508. --- Resolution: Fixed Fix Version/s: 0.11.0.1 0.11.1.0 Issue resolved by pull request 3429 [https://github.com/apache/kafka/pull/3429] > Documentation for altering topics > - > > Key: KAFKA-5508 > URL: https://issues.apache.org/jira/browse/KAFKA-5508 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Tom Bentley >Assignee: huxihx >Priority: Minor > Fix For: 0.11.1.0, 0.11.0.1 > > > According to the upgrade documentation: > bq. Altering topic configuration from the kafka-topics.sh script > (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the > kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. > But the Operations documentation still tells people to use kafka-topics.sh to > alter their topic configurations. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed
Damian Guy created KAFKA-5556: - Summary: KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed Key: KAFKA-5556 URL: https://issues.apache.org/jira/browse/KAFKA-5556 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.11.0.0, 0.10.2.1 Reporter: Damian Guy >From the user list: I have been running a streaming application on some data set. Things usually run ok. Today I was trying to run the same application on Kafka (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After running for quite some time, I got the following exception .. {code} Exception in thread "StreamThread-1" java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed > at > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > at > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) {code} Looks like we should check if the future is done, i.e., check the return value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5487) Rolling upgrade test for streams
[ https://issues.apache.org/jira/browse/KAFKA-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5487. --- Resolution: Fixed Fix Version/s: (was: 0.11.0.1) 0.11.1.0 Issue resolved by pull request 3411 [https://github.com/apache/kafka/pull/3411] > Rolling upgrade test for streams > > > Key: KAFKA-5487 > URL: https://issues.apache.org/jira/browse/KAFKA-5487 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Eno Thereska >Assignee: Eno Thereska > Fix For: 0.11.1.0 > > > We need to do a basic rolling upgrade test for streams, similar to the > tests/kafkatest/tests/core/upgrade_test.py test for Kafka core. Basically we > need to test the ability of a streams app to use a different JAR from a > different version. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4913) creating a window store with one segment throws division by zero error
[ https://issues.apache.org/jira/browse/KAFKA-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-4913. --- Resolution: Fixed Fix Version/s: (was: 0.11.0.1) 0.11.1.0 Issue resolved by pull request 3410 [https://github.com/apache/kafka/pull/3410] > creating a window store with one segment throws division by zero error > -- > > Key: KAFKA-4913 > URL: https://issues.apache.org/jira/browse/KAFKA-4913 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Xavier Léauté >Assignee: Damian Guy > Fix For: 0.11.1.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-3741) KStream config for changelog min.in.sync.replicas
[ https://issues.apache.org/jira/browse/KAFKA-3741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3741: -- Status: Patch Available (was: Open) > KStream config for changelog min.in.sync.replicas > - > > Key: KAFKA-3741 > URL: https://issues.apache.org/jira/browse/KAFKA-3741 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Roger Hoover >Assignee: Damian Guy > Labels: api > > Kafka Streams currently allows you to specify a replication factor for > changelog and repartition topics that it creates. It should also allow you > to specify min.in.sync.replicas. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-3741) KStream config for changelog min.in.sync.replicas
[ https://issues.apache.org/jira/browse/KAFKA-3741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reassigned KAFKA-3741: - Assignee: Damian Guy > KStream config for changelog min.in.sync.replicas > - > > Key: KAFKA-3741 > URL: https://issues.apache.org/jira/browse/KAFKA-3741 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Roger Hoover >Assignee: Damian Guy > Labels: api > > Kafka Streams currently allows you to specify a replication factor for > changelog and repartition topics that it creates. It should also allow you > to specify min.in.sync.replicas. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4913) creating a window store with one segment throws division by zero error
[ https://issues.apache.org/jira/browse/KAFKA-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4913: -- Status: In Progress (was: Patch Available) > creating a window store with one segment throws division by zero error > -- > > Key: KAFKA-4913 > URL: https://issues.apache.org/jira/browse/KAFKA-4913 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Xavier Léauté >Assignee: Damian Guy > Fix For: 0.11.0.1 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work stopped] (KAFKA-4913) creating a window store with one segment throws division by zero error
[ https://issues.apache.org/jira/browse/KAFKA-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4913 stopped by Damian Guy. - > creating a window store with one segment throws division by zero error > -- > > Key: KAFKA-4913 > URL: https://issues.apache.org/jira/browse/KAFKA-4913 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Xavier Léauté >Assignee: Damian Guy > Fix For: 0.11.0.1 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5357) StackOverFlow error in transaction coordinator
[ https://issues.apache.org/jira/browse/KAFKA-5357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5357 started by Damian Guy. - > StackOverFlow error in transaction coordinator > -- > > Key: KAFKA-5357 > URL: https://issues.apache.org/jira/browse/KAFKA-5357 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > Attachments: KAFKA-5357.tar.gz > > > I observed the following in the broker logs: > {noformat} > [2017-06-01 04:10:36,664] ERROR [Replica Manager on Broker 1]: Error > processing append operation on partition __transaction_state-37 > (kafka.server.ReplicaManager) > [2017-06-01 04:10:36,667] ERROR [TxnMarkerSenderThread-1]: Error due to > (kafka.common.InterBrokerSendThread) > java.lang.StackOverflowError > at java.security.AccessController.doPrivileged(Native Method) > at java.io.PrintWriter.(PrintWriter.java:116) > at java.io.PrintWriter.(PrintWriter.java:100) > at > org.apache.log4j.DefaultThrowableRenderer.render(DefaultThrowableRenderer.java:58) > at > org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) > at > org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) > at > org.apache.log4j.DailyRollingFileAppender.subAppend(DailyRollingFileAppender.java:369) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.error(Category.java:322) > at kafka.utils.Logging$class.error(Logging.scala:105) > at kafka.server.ReplicaManager.error(ReplicaManager.scala:122) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:557) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:505) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:505) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:346) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply$mcV$sp(TransactionStateManager.scala:589) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219) > at > kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:564) > at > kafka.coordinator.transaction.TransactionMarkerChannelManager.kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1(TransactionMarkerChannelManager.scala:225) > at > kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225) > at > kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225) > at > kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:561) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$4.apply(TransactionStateManager.scala:595) > at >
[jira] [Assigned] (KAFKA-5357) StackOverFlow error in transaction coordinator
[ https://issues.apache.org/jira/browse/KAFKA-5357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reassigned KAFKA-5357: - Assignee: Damian Guy > StackOverFlow error in transaction coordinator > -- > > Key: KAFKA-5357 > URL: https://issues.apache.org/jira/browse/KAFKA-5357 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > Attachments: KAFKA-5357.tar.gz > > > I observed the following in the broker logs: > {noformat} > [2017-06-01 04:10:36,664] ERROR [Replica Manager on Broker 1]: Error > processing append operation on partition __transaction_state-37 > (kafka.server.ReplicaManager) > [2017-06-01 04:10:36,667] ERROR [TxnMarkerSenderThread-1]: Error due to > (kafka.common.InterBrokerSendThread) > java.lang.StackOverflowError > at java.security.AccessController.doPrivileged(Native Method) > at java.io.PrintWriter.(PrintWriter.java:116) > at java.io.PrintWriter.(PrintWriter.java:100) > at > org.apache.log4j.DefaultThrowableRenderer.render(DefaultThrowableRenderer.java:58) > at > org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) > at > org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) > at > org.apache.log4j.DailyRollingFileAppender.subAppend(DailyRollingFileAppender.java:369) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.error(Category.java:322) > at kafka.utils.Logging$class.error(Logging.scala:105) > at kafka.server.ReplicaManager.error(ReplicaManager.scala:122) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:557) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:505) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:505) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:346) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply$mcV$sp(TransactionStateManager.scala:589) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:570) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:219) > at > kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:564) > at > kafka.coordinator.transaction.TransactionMarkerChannelManager.kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1(TransactionMarkerChannelManager.scala:225) > at > kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225) > at > kafka.coordinator.transaction.TransactionMarkerChannelManager$$anonfun$kafka$coordinator$transaction$TransactionMarkerChannelManager$$retryAppendCallback$1$4.apply(TransactionMarkerChannelManager.scala:225) > at > kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:561) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$4.apply(TransactionStateManager.scala:595) > at >
[jira] [Resolved] (KAFKA-4953) Global Store: cast exception when initialising with in-memory logged state store
[ https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-4953. --- Resolution: Fixed fixed by KAFKA-5045 > Global Store: cast exception when initialising with in-memory logged state > store > > > Key: KAFKA-4953 > URL: https://issues.apache.org/jira/browse/KAFKA-4953 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Yennick Trevels > Labels: user-experience > > Currently it is not possible to initialise a global store with an in-memory > *logged* store via the TopologyBuilder. This results in the following > exception: > {code} > java.lang.ClassCastException: > org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl > cannot be cast to > org.apache.kafka.streams.processor.internals.RecordCollector$Supplier > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:52) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:44) > at > org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) > at > org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97) > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:215) > at > org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235) > ... > {code} > I've created a PR which includes a unit this to verify this behavior. > If the below PR gets merge, the fixing PR should leverage the provided test > {{ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore}} by removing > the {{@ignore}} annotation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (KAFKA-4270) ClassCast for Agregation
[ https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-4270. --- Resolution: Cannot Reproduce Closing this as haven't been able to reproduce and no further information provided > ClassCast for Agregation > > > Key: KAFKA-4270 > URL: https://issues.apache.org/jira/browse/KAFKA-4270 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Mykola Polonskyi >Assignee: Damian Guy >Priority: Critical > Labels: architecture > > With defined serdes for intermediate topic in aggregation catch the > ClassCastException: from custom class to the ByteArray. > In debug I saw that defined serde isn't used for creation sinkNode (incide > `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) > Instead defined serde inside aggregation call is used default Impl with empty > plugs instead of implementations > {code:koltin} > userTable.join( > skicardsTable.groupBy { key, value -> > KeyValue(value.skicardInfo.ownerId, value.skicardInfo) } > .aggregate( > { mutableSetOf() }, > { ownerId, skicardInfo, accumulator -> > accumulator.put(skicardInfo) }, > { ownerId, skicardInfo, accumulator -> > accumulator }, > skicardByOwnerIdSerde, > skicardByOwnerIdTopicName > ), > { userCreatedOrUpdated, skicardInfoSet -> > UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) } > ).to( > userWithSkicardsTable > ) > {code} > I think current behavior of `doAggregate` with serdes and/or stores setting > up should be changed because that is incorrect in release 0.10.0.1-cp1 to. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5308) TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response
[ https://issues.apache.org/jira/browse/KAFKA-5308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5308: -- Status: Patch Available (was: Open) > TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response > -- > > Key: KAFKA-5308 > URL: https://issues.apache.org/jira/browse/KAFKA-5308 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Reporter: Jason Gustafson >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > > It could be the case that one of the topics added to a transaction is on a > lower message format version. Because of > https://github.com/apache/kafka/pull/3118, the producer won't be able to send > any data to that topic, but the TC will nevertheless try to write the > commit/abort marker to the log. Like the Produce request, the WriteTxnMarker > request should return the UNSUPPORTED_FOR_MESSSAGE_FORMAT error. Instead of > retrying, we should log a warning and remove the partition from the set of > partitions awaiting marker completion. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5308) TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response
[ https://issues.apache.org/jira/browse/KAFKA-5308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021295#comment-16021295 ] Damian Guy commented on KAFKA-5308: --- [~hachikuji] i think this may not be possible once https://github.com/apache/kafka/pull/3103/files is merged? i.e., in {{handleAddPartitionToTxnRequest}} we will check if any of the topics are on unsupported message format and respond with an {{Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT}} > TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response > -- > > Key: KAFKA-5308 > URL: https://issues.apache.org/jira/browse/KAFKA-5308 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Reporter: Jason Gustafson >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > > It could be the case that one of the topics added to a transaction is on a > lower message format version. Because of > https://github.com/apache/kafka/pull/3118, the producer won't be able to send > any data to that topic, but the TC will nevertheless try to write the > commit/abort marker to the log. Like the Produce request, the WriteTxnMarker > request should return the UNSUPPORTED_FOR_MESSSAGE_FORMAT error. Instead of > retrying, we should log a warning and remove the partition from the set of > partitions awaiting marker completion. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-5308) TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response
[ https://issues.apache.org/jira/browse/KAFKA-5308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reassigned KAFKA-5308: - Assignee: Damian Guy > TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker response > -- > > Key: KAFKA-5308 > URL: https://issues.apache.org/jira/browse/KAFKA-5308 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Reporter: Jason Gustafson >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > > It could be the case that one of the topics added to a transaction is on a > lower message format version. Because of > https://github.com/apache/kafka/pull/3118, the producer won't be able to send > any data to that topic, but the TC will nevertheless try to write the > commit/abort marker to the log. Like the Produce request, the WriteTxnMarker > request should return the UNSUPPORTED_FOR_MESSSAGE_FORMAT error. Instead of > retrying, we should log a warning and remove the partition from the set of > partitions awaiting marker completion. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5260) Producer should not send AbortTxn unless transaction has actually begun
[ https://issues.apache.org/jira/browse/KAFKA-5260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5260: -- Status: Patch Available (was: Open) > Producer should not send AbortTxn unless transaction has actually begun > --- > > Key: KAFKA-5260 > URL: https://issues.apache.org/jira/browse/KAFKA-5260 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Reporter: Jason Gustafson >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > > When there is an authorization error in AddOffsets or AddPartitions, the > producer will raise an authorization exception. When that happens, the user > should abort the transaction. The problem is that in an authorization error, > the coordinator will not have transitioned to a new state, so if it suddenly > receives an AbortTxnRequest, that request will fail with an InvalidTxnState, > which will be propagated to the error. The suggested solution is to keep > track locally when we are certain that no transaction has been officially > begun and to skip sending the AbortTxnRequest in that case. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5279) TransactionCoordinator must expire transactionalIds
[ https://issues.apache.org/jira/browse/KAFKA-5279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5279: -- Status: Patch Available (was: Open) > TransactionCoordinator must expire transactionalIds > --- > > Key: KAFKA-5279 > URL: https://issues.apache.org/jira/browse/KAFKA-5279 > Project: Kafka > Issue Type: Sub-task >Reporter: Apurva Mehta >Assignee: Damian Guy >Priority: Blocker > Labels: exactly-once > > Currently transactionalIds are not expired anywhere, so we accumulate forever. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-5154) Kafka Streams throws NPE during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reassigned KAFKA-5154: - Assignee: Damian Guy (was: Matthias J. Sax) > Kafka Streams throws NPE during rebalance > - > > Key: KAFKA-5154 > URL: https://issues.apache.org/jira/browse/KAFKA-5154 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Assignee: Damian Guy > Attachments: 5154_problem.log, clio_afa596e9b809.gz, clio_reduced.gz, > clio.txt.gz > > > please see attached log, Kafka streams throws NullPointerException during > rebalance, which is caught by our custom exception handler > {noformat} > 2017-04-30T17:44:17,675 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T17:44:27,395 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T17:44:27,941 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, > poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] > for group hades > 2017-04-30T17:44:27,947 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:48,468 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:53,628 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:09,587 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:11,961 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @375 - Successfully joined group hades with generation 99 > 2017-04-30T17:45:13,126 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete() > @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T17:46:37,254 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T18:04:25,993 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T18:04:29,401 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T18:05:10,877 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-05-01T00:01:55,707 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-05-01T00:01:59,027 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-05-01T00:01:59,031 ERROR StreamThread-1 > org.apache.kafka.streams.processor.internals.StreamThread.run() @376 - > stream-thread [StreamThread-1] Streams application error during processing: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > 2017-05-01T00:02:00,038 INFO StreamThread-1 > org.apache.kafka.clients.producer.KafkaProducer.close()
[jira] [Created] (KAFKA-5288) Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic
Damian Guy created KAFKA-5288: - Summary: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic Key: KAFKA-5288 URL: https://issues.apache.org/jira/browse/KAFKA-5288 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.0 Reporter: Damian Guy {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:406) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(ResetIntegrationTest.java:242) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:147) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:129) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at
[jira] [Reopened] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
[ https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reopened KAFKA-5063: --- > Flaky > ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic > - > > Key: KAFKA-5063 > URL: https://issues.apache.org/jira/browse/KAFKA-5063 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 0.11.0.0 > > > {noformat} > org.apache.kafka.streams.integration.ResetIntegrationTest > > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED > java.lang.AssertionError: > Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), > KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), > KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), > KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), > KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), > KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), > KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), > KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), > KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), > KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), > KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), > KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), > KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), > KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), > KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), > KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), > KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), > KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), > KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), > KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]> > but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), > KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), > KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), > KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), > KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), > KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), > KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), > KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), > KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), > KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), > KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), > KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), > KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), > KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), > KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), > KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), > KeyValue(2983939126875, 4)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
[ https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017058#comment-16017058 ] Damian Guy commented on KAFKA-5063: --- Another instance of this failure: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:406) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:171) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:147) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:129) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:46) at
[jira] [Commented] (KAFKA-5256) Non-checkpointed state stores should be deleted before restore
[ https://issues.apache.org/jira/browse/KAFKA-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015805#comment-16015805 ] Damian Guy commented on KAFKA-5256: --- >From 0.11.0 onwards we will always have a checkpoint unless exactly once is >turned on. See: https://issues.apache.org/jira/browse/KAFKA-4317 If exactly once is on then we will have to delete the db and restore from scratch. I assume this is being done as part of that work? [~mjsax] [~guozhang] > Non-checkpointed state stores should be deleted before restore > -- > > Key: KAFKA-5256 > URL: https://issues.apache.org/jira/browse/KAFKA-5256 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Tommy Becker > > Currently, Kafka Streams will re-use an existing state store even if there is > no checkpoint for it. This seems both inefficient (because duplicate inserts > can be made on restore) and incorrect (records which have been deleted from > the backing topic may still exist in the store). Since the contents of a > store with no checkpoint are unknown, the best way to proceed would be to > delete the store and recreate before restoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5154) Kafka Streams throws NPE during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015615#comment-16015615 ] Damian Guy commented on KAFKA-5154: --- [~Lukas Gemela] Thanks for attaching the full logs. [~mjsax] [~guozhang] i've been through the logs and extracted the relevant section. It is attached as 5154_error.log for reference. What it looks like is happening is the heartbeat fails as the group is rebalancing. {{StreamThread.onPartitionsRevoked}} is called and the tasks are cleared. The thread is trying to rejoin the group, but it looks like it is the only member. There are some issues communicating with the cluster. This member becomes the Leader for the group and starts doing partition assignment. Eventually we can see that {{StreamPartitionAssignor}} assigns all partitions to the single client: {noformat} Assigned tasks to clients as {8d510649-ddf3-41f1-86c9-c5b3c2c7a1b5=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5, 0_6, 0_7, 0_8, 0_9, 0_10, 0_11, 0_12, 0_13, 0_14, 0_15, 0_16, 0_17, 0_18, 0_19, 0_20, 0_21, 0_22, 0_23, 0_24, 0_25, 0_26, 0_27, 0_28, 0_29, 0_30, 0_31, 0_32, 0_33, 0_34, 0_35, 0_36, 0_37, 0_38, 0_39]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5, 0_6, 0_7, 0_8, 0_9, 0_10, 0_11, 0_12, 0_13, 0_14, 0_15, 0_16, 0_17, 0_18, 0_19, 0_20, 0_21, 0_22, 0_23, 0_24, 0_25, 0_26, 0_27, 0_28, 0_29, 0_30, 0_31, 0_32, 0_33, 0_34, 0_35, 0_36, 0_37, 0_38, 0_39]) {noformat} The SYNC_GROUP request gets cancelled and the coordinator marked as dead, but.. we are still sending fetch requests for the partitions that were previously revoked. {noformat} [[36m2017-05-07T00:02:03,402 DEBUG StreamThread-1 org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches() @180 - Sending fetch for partitions [poseidonIncidentFeed-12, poseidonIncidentFeed-21, poseidonIncidentFeed-6] to broker 10.210.200.171:9092 (id: 1 rack: null) [[36m2017-05-07T00:02:03,402 DEBUG StreamThread-1 org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches() @180 - Sending fetch for partitions [poseidonIncidentFeed-38] to broker 10.210.200.144:9092 (id: 3 rack: null) {noformat} So the {{StreamThread}} is still running with partitions that have been revoked, hence the {{activeTasksByPartition}} is empty causing the NPE. Now to try and figure out how this can happen. > Kafka Streams throws NPE during rebalance > - > > Key: KAFKA-5154 > URL: https://issues.apache.org/jira/browse/KAFKA-5154 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Assignee: Matthias J. Sax > Attachments: 5154_problem.log, clio_afa596e9b809.gz, clio_reduced.gz, > clio.txt.gz > > > please see attached log, Kafka streams throws NullPointerException during > rebalance, which is caught by our custom exception handler > {noformat} > 2017-04-30T17:44:17,675 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T17:44:27,395 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T17:44:27,941 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, > poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] > for group hades > 2017-04-30T17:44:27,947 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:48,468 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:53,628 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:09,587 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:11,961 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @375 - Successfully joined group hades with generation 99 > 2017-04-30T17:45:13,126 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete() > @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, >
[jira] [Updated] (KAFKA-5154) Kafka Streams throws NPE during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5154: -- Attachment: 5154_problem.log > Kafka Streams throws NPE during rebalance > - > > Key: KAFKA-5154 > URL: https://issues.apache.org/jira/browse/KAFKA-5154 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Lukas Gemela >Assignee: Matthias J. Sax > Attachments: 5154_problem.log, clio_afa596e9b809.gz, clio_reduced.gz, > clio.txt.gz > > > please see attached log, Kafka streams throws NullPointerException during > rebalance, which is caught by our custom exception handler > {noformat} > 2017-04-30T17:44:17,675 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T17:44:27,395 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T17:44:27,941 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, > poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] > for group hades > 2017-04-30T17:44:27,947 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:48,468 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:53,628 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:09,587 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:11,961 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @375 - Successfully joined group hades with generation 99 > 2017-04-30T17:45:13,126 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete() > @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T17:46:37,254 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T18:04:25,993 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T18:04:29,401 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T18:05:10,877 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-05-01T00:01:55,707 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-05-01T00:01:59,027 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-05-01T00:01:59,031 ERROR StreamThread-1 > org.apache.kafka.streams.processor.internals.StreamThread.run() @376 - > stream-thread [StreamThread-1] Streams application error during processing: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > 2017-05-01T00:02:00,038 INFO StreamThread-1 > org.apache.kafka.clients.producer.KafkaProducer.close() @689 - Closing
[jira] [Commented] (KAFKA-5241) GlobalKTable does not checkpoint offsets after restoring state
[ https://issues.apache.org/jira/browse/KAFKA-5241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16012270#comment-16012270 ] Damian Guy commented on KAFKA-5241: --- [~twbecker] Probably worth filing another JIRA for this as it is not just global stores that have this issue > GlobalKTable does not checkpoint offsets after restoring state > -- > > Key: KAFKA-5241 > URL: https://issues.apache.org/jira/browse/KAFKA-5241 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Tommy Becker >Priority: Minor > Fix For: 0.11.0.0 > > > I'm experimenting with an application that uses a relatively large > GlobalKTable, and noticed that streams was not checkpointing its offsets on > close(). This is because although > {{org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState}} > updates the checkpoint map, the actual checkpointing itself is guarded by a > check that the offsets passed from the {{GloablStateUpdateTask}} are not > empty. This is frustrating because if the topic backing the global table is > both large (therefore taking a long time to restore) and infrequently > written, then streams rebuilds the table from scratch every time the > application is started. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5219) Move transaction expiration logic and scheduling to the Transaction Manager
Damian Guy created KAFKA-5219: - Summary: Move transaction expiration logic and scheduling to the Transaction Manager Key: KAFKA-5219 URL: https://issues.apache.org/jira/browse/KAFKA-5219 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Presently the transaction expiration logic is spread between the {{TransactionStateManager}} and the {{TransactionCoordinator}}. It would be best if it was all in the {{TransactionStateManager}}. This requires moving the bulk of the commit/abort logic, too. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-5129) TransactionCoordinator - Add ACL check for each request
[ https://issues.apache.org/jira/browse/KAFKA-5129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy reassigned KAFKA-5129: - Assignee: Damian Guy > TransactionCoordinator - Add ACL check for each request > --- > > Key: KAFKA-5129 > URL: https://issues.apache.org/jira/browse/KAFKA-5129 > Project: Kafka > Issue Type: Sub-task >Reporter: Damian Guy >Assignee: Damian Guy > > We need to add the ACL check for each of the new requests in TC -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5168) Cleanup delayed produce purgatory during partition emmigration
[ https://issues.apache.org/jira/browse/KAFKA-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5168: -- Resolution: Not A Problem Status: Resolved (was: Patch Available) This is already handled > Cleanup delayed produce purgatory during partition emmigration > --- > > Key: KAFKA-5168 > URL: https://issues.apache.org/jira/browse/KAFKA-5168 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Damian Guy >Assignee: Damian Guy > > When partitions are emmigrated we need to forceComplete any partition in the > replica managers delayed produce purgatory so that they can error out. This > needs to be done after the partition has been removed from the > ownedPartitions map so that they can error out with NOT_COORDINATOR -- This message was sent by Atlassian JIRA (v6.3.15#6346)