[jira] [Commented] (KAFKA-8730) Add API to delete consumer offsets (KIP-496)
[ https://issues.apache.org/jira/browse/KAFKA-8730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929615#comment-16929615 ] ASF GitHub Bot commented on KAFKA-8730: --- hachikuji commented on pull request #7276: KAFKA-8730: Add API to delete consumer offsets (KIP-496) URL: https://github.com/apache/kafka/pull/7276 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add API to delete consumer offsets (KIP-496) > > > Key: KAFKA-8730 > URL: https://issues.apache.org/jira/browse/KAFKA-8730 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > Implements KIP-496 as documented here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929591#comment-16929591 ] Matthias J. Sax commented on KAFKA-8897: As we would skip some RocksDB releases, I am wondering when `CompactionOptionsFIFO` was deprecated – seem not to be deprecated in currently used version 5.18.3 – hence, we cannot claim they use deprecate APIs... Would like to hear what [~guozhang] or [~ijuma] thinks about it? It's valid code and I agree with [~ableegoldman] that we should not break it... But we can also not deprecate it on our side. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8474) Improve configuration layout on website
[ https://issues.apache.org/jira/browse/KAFKA-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929578#comment-16929578 ] ASF GitHub Bot commented on KAFKA-8474: --- hachikuji commented on pull request #6870: KAFKA-8474: Use HTML lists for config layout URL: https://github.com/apache/kafka/pull/6870 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve configuration layout on website > --- > > Key: KAFKA-8474 > URL: https://issues.apache.org/jira/browse/KAFKA-8474 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > The description of configurations on the website are really hard to read due > to the narrow columns. > Let's get rid of the tables so we can use the full page width. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Resolved] (KAFKA-8474) Improve configuration layout on website
[ https://issues.apache.org/jira/browse/KAFKA-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8474. Resolution: Fixed > Improve configuration layout on website > --- > > Key: KAFKA-8474 > URL: https://issues.apache.org/jira/browse/KAFKA-8474 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > The description of configurations on the website are really hard to read due > to the narrow columns. > Let's get rid of the tables so we can use the full page width. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Resolved] (KAFKA-8755) Stand-by Task of an Optimized Source Table Does Not Write Anything to its State Store
[ https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8755. -- Resolution: Fixed > Stand-by Task of an Optimized Source Table Does Not Write Anything to its > State Store > - > > Key: KAFKA-8755 > URL: https://issues.apache.org/jira/browse/KAFKA-8755 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Bruno Cadonna >Assignee: Chris Pettitt >Priority: Major > Labels: newbie > Fix For: 2.4.0 > > Attachments: StandbyTaskTest.java > > > With the following topology: > {code:java} > builder.table( > INPUT_TOPIC, > Consumed.with(Serdes.Integer(), Serdes.Integer()), > Materialized.>as(stateName) > ) > {code} > and with topology optimization turned on, Kafka Streams uses the input topic > {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A > stand-by task for such a topology should read from {{INPUT_TOPIC}} and should > write the records to its state store so that the streams client that runs the > stand-by task can take over the execution of the topology in case of a > failure with an up-to-date replica of the state. > Currently, the stand-by task described above reads from the input topic but > does not write the records to its state store. Thus, after a failure the > stand-by task cannot provide any up-to-date state store and the streams > client needs to construct the state from scratch before it can take over the > execution. > The described behaviour can be reproduced with the attached test. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8755) Stand-by Task of an Optimized Source Table Does Not Write Anything to its State Store
[ https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929576#comment-16929576 ] ASF GitHub Bot commented on KAFKA-8755: --- guozhangwang commented on pull request #7238: KAFKA-8755: Fix state restore for standby tasks with optimized topology URL: https://github.com/apache/kafka/pull/7238 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stand-by Task of an Optimized Source Table Does Not Write Anything to its > State Store > - > > Key: KAFKA-8755 > URL: https://issues.apache.org/jira/browse/KAFKA-8755 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Bruno Cadonna >Assignee: Chris Pettitt >Priority: Major > Labels: newbie > Attachments: StandbyTaskTest.java > > > With the following topology: > {code:java} > builder.table( > INPUT_TOPIC, > Consumed.with(Serdes.Integer(), Serdes.Integer()), > Materialized.>as(stateName) > ) > {code} > and with topology optimization turned on, Kafka Streams uses the input topic > {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A > stand-by task for such a topology should read from {{INPUT_TOPIC}} and should > write the records to its state store so that the streams client that runs the > stand-by task can take over the execution of the topology in case of a > failure with an up-to-date replica of the state. > Currently, the stand-by task described above reads from the input topic but > does not write the records to its state store. Thus, after a failure the > stand-by task cannot provide any up-to-date state store and the streams > client needs to construct the state from scratch before it can take over the > execution. > The described behaviour can be reproduced with the attached test. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8755) Stand-by Task of an Optimized Source Table Does Not Write Anything to its State Store
[ https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-8755: - Fix Version/s: 2.4.0 > Stand-by Task of an Optimized Source Table Does Not Write Anything to its > State Store > - > > Key: KAFKA-8755 > URL: https://issues.apache.org/jira/browse/KAFKA-8755 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Bruno Cadonna >Assignee: Chris Pettitt >Priority: Major > Labels: newbie > Fix For: 2.4.0 > > Attachments: StandbyTaskTest.java > > > With the following topology: > {code:java} > builder.table( > INPUT_TOPIC, > Consumed.with(Serdes.Integer(), Serdes.Integer()), > Materialized.>as(stateName) > ) > {code} > and with topology optimization turned on, Kafka Streams uses the input topic > {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A > stand-by task for such a topology should read from {{INPUT_TOPIC}} and should > write the records to its state store so that the streams client that runs the > stand-by task can take over the execution of the topology in case of a > failure with an up-to-date replica of the state. > Currently, the stand-by task described above reads from the input topic but > does not write the records to its state store. Thus, after a failure the > stand-by task cannot provide any up-to-date state store and the streams > client needs to construct the state from scratch before it can take over the > execution. > The described behaviour can be reproduced with the attached test. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929550#comment-16929550 ] Sophie Blee-Goldman edited comment on KAFKA-8897 at 9/13/19 9:39 PM: - Well, the following user code would break and just fail to compile: {code:java} public void setConfig(final String storeName, final Options options, final Map configs) { CompactionOptionsFIFO fifo = new CompactionOptionsFIFO(); fifo.setTtl(1000L); options.setCompactionOptionsFIFO(fifo); }{code} was (Author: ableegoldman): Well, the following user code would break: {code:java} public void setConfig(final String storeName, final Options options, final Map configs) { CompactionOptionsFIFO fifo = new CompactionOptionsFIFO(); fifo.setTtl(1000L); options.setCompactionOptionsFIFO(fifo); }{code} > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929550#comment-16929550 ] Sophie Blee-Goldman commented on KAFKA-8897: Well, the following user code would break: {code:java} public void setConfig(final String storeName, final Options options, final Map configs) { CompactionOptionsFIFO fifo = new CompactionOptionsFIFO(); fifo.setTtl(1000L); options.setCompactionOptionsFIFO(fifo); }{code} > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929534#comment-16929534 ] Matthias J. Sax commented on KAFKA-8897: Interesting point. We only expose `Options` in `RocksDBConfigSetter` – is `Options` affected by this change? For plain KV-stores, there might be an impact, but maybe we could even work around it? For timestamped stores, we actually translate `Options` to `ColumnFamilieOptions` via the adapter already – hence, we might also be able to hide it from the user? > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8907) Return topic configs in CreateTopics response
Rajini Sivaram created KAFKA-8907: - Summary: Return topic configs in CreateTopics response Key: KAFKA-8907 URL: https://issues.apache.org/jira/browse/KAFKA-8907 Project: Kafka Issue Type: New Feature Components: clients Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.4.0 See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response] for details -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8894) Flaky org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic
[ https://issues.apache.org/jira/browse/KAFKA-8894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929485#comment-16929485 ] ASF GitHub Bot commented on KAFKA-8894: --- stanislavkozlovski commented on pull request #7330: KAFKA-8894: Bump streams test topic deletion assertion timeout from 30s to 60s URL: https://github.com/apache/kafka/pull/7330 We have seen rare flakiness in this assertion - all of streams' internal topics would not get deleted within the 30 second window. Increasing to 60 seconds should reduce the occurrence. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic > -- > > Key: KAFKA-8894 > URL: https://issues.apache.org/jira/browse/KAFKA-8894 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Priority: Minor > > {code:java} > java.lang.AssertionError: Condition not met within timeout 3. Topics are > not expected after 3 milli seconds. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.waitForRemainingTopics(EmbeddedKafkaCluster.java:298) > at > org.apache.kafka.streams.integration.AbstractResetIntegrationTest.assertInternalTopicsGotDeleted(AbstractResetIntegrationTest.java:589) > at > org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(AbstractResetIntegrationTest.java:399) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(ResetIntegrationTest.java:82) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24733/testReport/junit/org.apache.kafka.streams.integration/ResetIntegrationTest/testReprocessingFromFileAfterResetWithoutIntermediateUserTopic/] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929430#comment-16929430 ] Sophie Blee-Goldman commented on KAFKA-8897: [~mjsax] [~cadonna] Can we do a major version bump of a dependency in a minor release? Looking at the release notes, it seems they made (only) one breaking change in the Java API that is relevant to users (ie RocksDBConfigSetter): * Remove ttl option from {{CompactionOptionsFIFO}}. The option has been deprecated and ttl in {{ColumnFamilyOptions}} is used instead. I can't say whether this would actually affect any users in practice, but I do know some have been playing around with FIFO compaction as a way to prevent unbounded growth of kv stores. Given the frequent asks for a ttl KeyValueStore it definitely seems possible someone may be using this. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Pradeep Kumar reassigned KAFKA-8377: -- Assignee: Aishwarya Pradeep Kumar > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Aishwarya Pradeep Kumar >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929382#comment-16929382 ] satyanarayan komandur commented on KAFKA-3705: -- How is this change different from ValueJoiner on kstreams > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Assignee: Adam Bellemare >Priority: Major > Labels: api, kip > > KIP-213: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable] > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code:java} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929359#comment-16929359 ] Ryanne Dolan commented on KAFKA-7500: - [~chridtian.hagel] thanks for giving it a spin. The "failed to flush" errors are probably due to WorkerSourceTask being unable to send the 5942 messages within the default flush timeout, which I believe is 5 seconds. There are various reasons this might be the case: - tasks.max could be 1 (the default), which means a single Producer is sending records across the entire Herder. Try increasing this considerably. This can be as high as the total number of partitions being replicated, at the cost of more overhead per partition, obviously. If you configure this too high, MM2 just uses one task per partition. - The producer lag may be high, which is detrimental to throughput. Make sure the MM2 driver is running close to the target cluster to minimize this latency. If you are replicating between multiple DCs, consider running a few MM2 nodes in each DC, with `--clusters` argument to hint which clusters are nearby. That way, drivers will consume from other DCs but only produce locally. - You may need to use more MM2 nodes. - You may need to increase the 5 second flush timeout. Re: duplicated messages, you are correct that MM2 will send dupes if containers are bounced like that. Generally, this is okay -- occasional dupes are a fact of life in most Kafka pipelines. That said, I am working on a PoC and KIP for exactly-once replication with MM2, which will eliminate these dupes. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929355#comment-16929355 ] Matthias J. Sax commented on KAFKA-8897: Added [~rachana_prajapati] to the list of contributors so she can self-assign. Btw: we should also run the system tests on the PR. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929350#comment-16929350 ] Matthias J. Sax commented on KAFKA-8377: [~ash26389] – the ticket is not assigned, hence, I think nobody is working on it – if [~Yohan123] does not reply feel free to pick it up. > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8906) Metrics for Key-Value, Window, and Session Stores are Inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-8906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929341#comment-16929341 ] Matthias J. Sax commented on KAFKA-8906: Seems to be related to https://issues.apache.org/jira/browse/KAFKA-8088 – compare the corresponding KIP discussion thread. > Metrics for Key-Value, Window, and Session Stores are Inconsistent > -- > > Key: KAFKA-8906 > URL: https://issues.apache.org/jira/browse/KAFKA-8906 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.2.0, 2.3.0 >Reporter: Bruno Cadonna >Priority: Major > > Key-value stores record read operations in metrics prefixed by {{get-}}, > whereas window and session stores record read operations in metrics prefixed > by {{fetch-}}. The documentation does not mention {{fetch}} at all. > We should either adapt the documentation or change the metrics prefixed by > {{fetch-}} to be prefixed with {{get-}}. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8905) Stream DSL: tasks should take serdes from upstream tasks
[ https://issues.apache.org/jira/browse/KAFKA-8905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929336#comment-16929336 ] Matthias J. Sax commented on KAFKA-8905: To add to what [~cadonna] said – the same argument holds for `groupBy()` that modifies the key (and potentially the key type). In fact, since 2.1 release Kafka Streams does pass key and values serdes downstream if possible (cf https://issues.apache.org/jira/browse/KAFKA-7456) – however, each time an operator changes either the key or value, it's not safe to pass the corresponding serde further downstream and it gets dropped, falling back to the default serdes from the configuration. To be fair, there is one more open ticket about further improvements: https://issues.apache.org/jira/browse/KAFKA-7200 Anyway, I think this ticket should be closed as "not a problem". Do you agree [~mewmesiti]? > Stream DSL: tasks should take serdes from upstream tasks > > > Key: KAFKA-8905 > URL: https://issues.apache.org/jira/browse/KAFKA-8905 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Eduard Wirch >Priority: Major > Labels: usability > > {code:java} > final Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > final StreamsBuilder builder = new StreamsBuilder(); > final KStream source = builder.stream( > "streams-plaintext-input", > Consumed.with(Serdes.String(), Serdes.String()) > ); > final KTable counts = source > .flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) > .groupBy( > (key, value) -> value > ) > .count(); > // need to override value serde to Long type > counts.toStream().to("streams-wordcount-output", > Produced.with(Serdes.String(), Serdes.Long())); > final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code} > Original code taken from code sample > [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java] > I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and > {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This > application will fail: > {code:java} > Caused by: java.lang.ClassCastException: java.lang.String incompatible with > [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with > [B at > org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) > at > org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) > {code} > Adjusting this part of the code: > {code:java} > .groupBy( > (key, value) -> value, > Grouped.with(Serdes.String(), Serdes.String()) > ) {code} > Will make the application run properly. > This explicit serde specification is unnecessarily, since the serde are > already known from upstream source task. Relying on default serde works in > this simple example, but fails for more complex scenarios. > Please make the DSL more usable by taking the serde configuration from > upstream tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-4972) Kafka 0.10.0 Found a corrupted index file during Kafka broker startup
[ https://issues.apache.org/jira/browse/KAFKA-4972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929298#comment-16929298 ] Sri Vishnu commented on KAFKA-4972: --- Hi all, we had a similar issue when we were restarting our brokers. Turns out, for us, it was an issue with the {{systemd}} configuration. We have 350 GB of data on each broker with 150 topics and shutting down the Kafka server needs about 8 minutes. However, {{systemd}} was configured to wait only 90 seconds for the server to shutdown and then its force kills the server. When the server is restarted, it will end up having corrupted index file because its didn't shutdown properly. The fix was to set the {{TimeoutStopSec=600}} config in systemd configuration. We summarised the issue and the fix in a blog post: [https://blog.experteer.engineering/kafka-corrupted-index-file-warnings-after-broker-restart.html] Hopefully, it is helpful for some of you. > Kafka 0.10.0 Found a corrupted index file during Kafka broker startup > -- > > Key: KAFKA-4972 > URL: https://issues.apache.org/jira/browse/KAFKA-4972 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0 > Environment: JDK: HotSpot x64 1.7.0_80 > Tag: 0.10.0 >Reporter: fangjinuo >Priority: Critical > Labels: reliability > Attachments: Snap3.png > > > -deleted text-After force shutdown all kafka brokers one by one, restart them > one by one, but a broker startup failure. > The following WARN leval log was found in the log file: > found a corrutped index file, .index , delet it ... > you can view details by following attachment. > ~I look up some codes in core module, found out : > the nonthreadsafe method LogSegment.append(offset, messages) has tow caller: > 1) Log.append(messages) // here has a synchronized > lock > 2) LogCleaner.cleanInto(topicAndPartition, source, dest, map, retainDeletes, > messageFormatVersion) // here has not > So I guess this may be the reason for the repeated offset in 0xx.log file > (logsegment's .log) ~ > Although this is just my inference, but I hope that this problem can be > quickly repaired -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8905) Stream DSL: tasks should take serdes from upstream tasks
[ https://issues.apache.org/jira/browse/KAFKA-8905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929264#comment-16929264 ] Bruno Cadonna commented on KAFKA-8905: -- As far as I can see, we cannot use the upstream serdes in {{groupBy()}}, because {{flatMapValues()}} could have mapped the value to anything. For example, with the following code the value would be an {{Integer}} after the {{flatMapValues()}}. {code:java} final KTable counts = source .flatMapValues( value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")) .stream().map(word -> new Integer(word.length())).collect(Collectors.toList())) .groupBy((key, value) -> value) .count(); {code} > Stream DSL: tasks should take serdes from upstream tasks > > > Key: KAFKA-8905 > URL: https://issues.apache.org/jira/browse/KAFKA-8905 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Eduard Wirch >Priority: Major > Labels: usability > > {code:java} > final Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > final StreamsBuilder builder = new StreamsBuilder(); > final KStream source = builder.stream( > "streams-plaintext-input", > Consumed.with(Serdes.String(), Serdes.String()) > ); > final KTable counts = source > .flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) > .groupBy( > (key, value) -> value > ) > .count(); > // need to override value serde to Long type > counts.toStream().to("streams-wordcount-output", > Produced.with(Serdes.String(), Serdes.Long())); > final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code} > Original code taken from code sample > [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java] > I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and > {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This > application will fail: > {code:java} > Caused by: java.lang.ClassCastException: java.lang.String incompatible with > [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with > [B at > org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) > at > org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) > {code} > Adjusting this part of the code: > {code:java} > .groupBy( > (key, value) -> value, > Grouped.with(Serdes.String(), Serdes.String()) > ) {code} > Will make the application run properly. > This explicit serde specification is unnecessarily, since the serde are > already known from upstream source task. Relying on default serde works in > this simple example, but fails for more complex scenarios. > Please make the DSL more usable by taking the serde configuration from > upstream tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins
[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929245#comment-16929245 ] Aishwarya Pradeep Kumar commented on KAFKA-8377: [~Yohan123] - let me know if you are working on this ticket, if not I would like to pick it up. > KTable#transformValue might lead to incorrect result in joins > - > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929140#comment-16929140 ] Bruno Cadonna commented on KAFKA-8897: -- Yes please, go ahead. What you basically need to do is: # increase RocksDB version to 6.2.2 # build the project and run all unit and integration tests # if there are failures try to get rid of them If you are blocked do not hesitate to contact the mailing list or comment on this ticket. If you cannot assign this ticket to yourself ask on the mailing list for permissions. You need to include your Jira ID in the e-mail. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-8897: Assignee: Bruno Cadonna > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-8897: Assignee: (was: Bruno Cadonna) > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8800) Flaky Test SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929078#comment-16929078 ] Bruno Cadonna commented on KAFKA-8800: -- [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/1617/] > Flaky Test > SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > -- > > Key: KAFKA-8800 > URL: https://issues.apache.org/jira/browse/KAFKA-8800 > Project: Kafka > Issue Type: Bug > Components: core, security, unit tests >Affects Versions: 2.4.0 >Reporter: Matthias J. Sax >Assignee: Anastasia Vela >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6956/testReport/junit/kafka.api/SaslScramSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/] > {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records > before timeout instead of the expected 1 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at > kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1312) at > kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1320) at > kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522) > at > kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361){quote} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8800) Flaky Test SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929077#comment-16929077 ] Bruno Cadonna commented on KAFKA-8800: -- [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/7561/] > Flaky Test > SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > -- > > Key: KAFKA-8800 > URL: https://issues.apache.org/jira/browse/KAFKA-8800 > Project: Kafka > Issue Type: Bug > Components: core, security, unit tests >Affects Versions: 2.4.0 >Reporter: Matthias J. Sax >Assignee: Anastasia Vela >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6956/testReport/junit/kafka.api/SaslScramSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/] > {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records > before timeout instead of the expected 1 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at > kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1312) at > kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1320) at > kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522) > at > kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361){quote} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-8835) Update documentation for URP changes in KIP-352
[ https://issues.apache.org/jira/browse/KAFKA-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass reassigned KAFKA-8835: -- Assignee: Viktor Somogyi-Vass > Update documentation for URP changes in KIP-352 > --- > > Key: KAFKA-8835 > URL: https://issues.apache.org/jira/browse/KAFKA-8835 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > This Jira covers any doc changes needed for the changes to URP semantics in > KIP-352: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment]. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8862) Misleading exception message for non-existant partition
[ https://issues.apache.org/jira/browse/KAFKA-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929073#comment-16929073 ] Tom Bentley commented on KAFKA-8862: [~hachikuji] any chance you could review this? Thanks. > Misleading exception message for non-existant partition > --- > > Key: KAFKA-8862 > URL: https://issues.apache.org/jira/browse/KAFKA-8862 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.0 >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > Labels: patch-available > > https://issues.apache.org/jira/browse/KAFKA-6833 changed the logic of the > {{KafkaProducer.waitOnMetadata}} so that if a partition did not exist it > would wait for it to exist. > It means that if called with an incorrect partition the method will > eventually throw a {{TimeoutException}}, which covers both topic and > partition non-existence cases. > However, the exception message was not changed for the case where > {{metadata.awaitUpdate(version, remainingWaitMs)}} throws a > {{TimeoutException}}. > This results in a confusing exception message. For example, if a producer > tries to send to a non-existent partition of an existing topic the message is > "Topic %s not present in metadata after %d ms.", when timeout via the other > code path would come with message > "Partition %d of topic %s with partition count %d is not present in metadata > after %d ms." -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8906) Metrics for Key-Value, Window, and Session Stores are Inconsistent
Bruno Cadonna created KAFKA-8906: Summary: Metrics for Key-Value, Window, and Session Stores are Inconsistent Key: KAFKA-8906 URL: https://issues.apache.org/jira/browse/KAFKA-8906 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0, 2.2.0 Reporter: Bruno Cadonna Key-value stores record read operations in metrics prefixed by {{get-}}, whereas window and session stores record read operations in metrics prefixed by {{fetch-}}. The documentation does not mention {{fetch}} at all. We should either adapt the documentation or change the metrics prefixed by {{fetch-}} to be prefixed with {{get-}}. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929057#comment-16929057 ] Christian Hagel edited comment on KAFKA-7500 at 9/13/19 9:09 AM: - Dear [~ryannedolan] , thanks also from my side for your efforts pushing MM2. I was trying out MM2 in order to mirror one cluster completely to a downstream one in a distributed connector setup. When running the MM2, I saw at the beginning lot's of {code:java} WorkerSourceTask{id=mirror-maker-49} Failed to flush, timed out while waiting for producer to flush outstanding 5942 messages WorkerSourceTask{id=mirror-maker-49} Failed to commit offsets{code} Error messages. I would guess they are happening when the MM2 replicates faster, than it can write on the connect.offsets.topic. This topic is configured with 25 partitions, which I thought should provide enough throughput to handle the load. In order to test the robustness of the MM2, we also replaced the containers several times during the replication and observed a surprising amount of duplicated messages, which I guess happen because of the lagging offset commit. Any ideas, how the setup can be improved so it becomes more robust. was (Author: chridtian.hagel): Dear [~ryannedolan] , thanks also from my side for your efforts pushing MM2. I was trying out MM2 in order to mirror one cluster completely to a downstream one in a distributed connector setup. When running the MM2, I saw at the beginning lot's of ```WorkerSourceTask\{id=mirror-maker-49} Failed to flush, timed out while waiting for producer to flush outstanding 5942 messages``` & ```WorkerSourceTask\{id=mirror-maker-49} Failed to commit offsets``` Error messages. I would guess they are happening when the MM2 replicates faster, than it can write on the connect.offsets.topic. This topic is configured with 25 partitions, which I thought should provide enough throughput to handle the load. In order to test the robustness of the MM2, we also replaced the containers several times during the replication and observed a surprising amount of duplicated messages, which I guess happen because of the lagging offset commit. Any ideas, how the setup can be improved so it becomes more robust. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929057#comment-16929057 ] Christian Hagel commented on KAFKA-7500: Dear [~ryannedolan] , thanks also from my side for your efforts pushing MM2. I was trying out MM2 in order to mirror one cluster completely to a downstream one in a distributed connector setup. When running the MM2, I saw at the beginning lot's of ```WorkerSourceTask\{id=mirror-maker-49} Failed to flush, timed out while waiting for producer to flush outstanding 5942 messages``` & ```WorkerSourceTask\{id=mirror-maker-49} Failed to commit offsets``` Error messages. I would guess they are happening when the MM2 replicates faster, than it can write on the connect.offsets.topic. This topic is configured with 25 partitions, which I thought should provide enough throughput to handle the load. In order to test the robustness of the MM2, we also replaced the containers several times during the replication and observed a surprising amount of duplicated messages, which I guess happen because of the lagging offset commit. Any ideas, how the setup can be improved so it becomes more robust. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8905) Stream DSL: tasks should take serdes from upstream tasks
Eduard Wirch created KAFKA-8905: --- Summary: Stream DSL: tasks should take serdes from upstream tasks Key: KAFKA-8905 URL: https://issues.apache.org/jira/browse/KAFKA-8905 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0 Reporter: Eduard Wirch {code:java} final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); final StreamsBuilder builder = new StreamsBuilder(); final KStream source = builder.stream( "streams-plaintext-input", Consumed.with(Serdes.String(), Serdes.String()) ); final KTable counts = source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) .groupBy( (key, value) -> value ) .count(); // need to override value serde to Long type counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code} Original code taken from code sample [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java] I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This application will fail: {code:java} Caused by: java.lang.ClassCastException: java.lang.String incompatible with [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) {code} Adjusting this part of the code: {code:java} .groupBy( (key, value) -> value, Grouped.with(Serdes.String(), Serdes.String()) ) {code} Will make the application run properly. This explicit serde specification is unnecessarily, since the serde are already known from upstream source task. Relying on default serde works in this simple example, but fails for more complex scenarios. Please make the DSL more usable by taking the serde configuration from upstream tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8897) Increase Version of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929025#comment-16929025 ] Rachana Prajapati commented on KAFKA-8897: -- [~cadonna] I am new to this repo and want to start contributing. I see a similar kind of change here: [https://github.com/apache/kafka/pull/6743]. Can I go ahead with making similar changes for the RocksDB upgrade and create a PR? Thanks. > Increase Version of RocksDB > --- > > Key: KAFKA-8897 > URL: https://issues.apache.org/jira/browse/KAFKA-8897 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > A higher version (6+) of RocksDB is needed for some metrics specified in > KIP-471. -- This message was sent by Atlassian Jira (v8.3.2#803003)