[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524058#comment-16524058 ] Jagadesh Adireddi commented on KAFKA-6964: -- Got it [~bbejeck]... Should we skip this feature for now and close?. > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523810#comment-16523810 ] Jagadesh Adireddi edited comment on KAFKA-6964 at 6/26/18 2:38 PM: --- [~mjsax], After revisiting code, got 2 points in mind. Can you please help me in understanding, which one is valid. 1) As described in the ticket , we need to print only "internal topic". Does this mean we need to expose InternalTopologyBuilder#internalTopicNames Set, that got added through `InternalTopologyBuilder#addInternalTopic` 2) As mentioned in your comment, repartition topic and changelog topic constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` and read `InternalTopicConfig#name` field from both repartitionTopics, stateChangelogTopics and print them. was (Author: adireddijagad...@gmail.com): [~mjsax], After revisiting code, got 2 points in mind. Can you please help me in understanding, which one is valid. 1) As described in the ticket , we need to print only "internal topic". Does this mean we need to expose InternalTopologyBuilder#internalTopicNames Set, that got added through `InternalTopologyBuilder#addInternalTopic` 2) As mentioned in your comment, repartition topic and changelog topic constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` and read 'InternalTopicConfig name' filed from both repartitionTopics, stateChangelogTopics and print them. > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523810#comment-16523810 ] Jagadesh Adireddi commented on KAFKA-6964: -- [~mjsax], After revisiting code, got 2 points in mind. Can you please help me in understanding, which one is valid. 1) As described in the ticket , we need to print only "internal topic". Does this mean we need to expose InternalTopologyBuilder#internalTopicNames Set, that got added through `InternalTopologyBuilder#addInternalTopic` 2) As mentioned in your comment, repartition topic and changelog topic constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` and read 'InternalTopicConfig name' filed from both repartitionTopics, stateChangelogTopics and print them. > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507936#comment-16507936 ] Jagadesh Adireddi commented on KAFKA-6964: -- Hi [~bbejeck], Just trying to make myself clear. If we want to print internal topic names, can we use `InternalTopologyBuilder#getSourceTopicNames()` to get all topic names? > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6964: Assignee: Jagadesh Adireddi > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7015) Enhance RecordCollectorImpl exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-7015: Assignee: Jagadesh Adireddi > Enhance RecordCollectorImpl exceptions with more context information > - > > Key: KAFKA-7015 > URL: https://issues.apache.org/jira/browse/KAFKA-7015 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Minor > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code:java} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > The store exceptions got fixed via KAFKA-6538. > This Jira is to track the fix for RecordCollectorImpl. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494742#comment-16494742 ] Jagadesh Adireddi commented on KAFKA-6906: -- [~guozhang], Thank you for the conformation. Will *consumedOffsets* Map or *stateMgr* Map in `StreamTask` gives the information of last commit?. Or am i looking at wrong place?. > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494692#comment-16494692 ] Jagadesh Adireddi commented on KAFKA-6906: -- [~mjsax], should we keep using `commitOffsetNeeded` flag and proceed further to fix this bug. And push point 3 and 4 code changes to another Jira?. > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492827#comment-16492827 ] Jagadesh Adireddi edited comment on KAFKA-6906 at 5/28/18 5:00 PM: --- Hi [~mjsax], Sorry for the late response. I am on vacation. I want to re-iterate the solution provided from above comments. 1) When EOS is turned on, we need to call StreamTask#commitOffsets for both `PipeInput` and `advanceWallClockTime` cases. 2) We need to delete `commitOffsetNeeded` field in `StreamTask` class. 3) For each topic partition in StreamTask#{{consumedOffsets, we need to filter out topic partitions that have changes from last commit. And commit transaction.}} {{4) And partitions with no changes from last commit Map, need to sent to `}}{{commitSync}}{{` method, once KIP-211 is merged.}} {{Can you please clarify whether point 3 & 4 applicable in both cases EOS is turned-ON/OFF.And}}{{ }}*consumedOffsets* Map or *stateMgr* Map in `StreamTask` gives the information of last commit? {{ }} ` ` was (Author: adireddijagad...@gmail.com): Hi [~mjsax], Sorry for the late response. I am on vacation. I want to re-iterate the solution provided from above comments. 1) When EOS is turned on, we need to call StreamTask#commitOffsets for both `PipeInput` and `advanceWallClockTime` cases. 2) We need to delete `commitOffsetNeeded` field in `StreamTask` class. 3) For each topic partition in StreamTask#{{consumedOffsets, we need to filter out topic partitions that have changes from last commit. And commit transaction.}} {{4) And partitions with no changes from last commit Map, need to sent to `}}{{commitSync}}{{` method, once KIP-211 is merged.}} {{Does point 3 & 4 applicable in both cases EOS is turned-ON/OFF. D}}{{oes }}*consumedOffsets* Map or *stateMgr* Map in `StreamTask` gives the information of last commit? {{ }} ` ` > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492827#comment-16492827 ] Jagadesh Adireddi edited comment on KAFKA-6906 at 5/28/18 5:06 PM: --- Hi [~mjsax], Sorry for the late response. I am on vacation. I want to re-iterate the solution provided from above comments. 1) When EOS is turned on, we need to call StreamTask#commitOffsets for both `PipeInput` and `advanceWallClockTime` cases. 2) We need to delete `commitOffsetNeeded` field in `StreamTask` class. 3) For each topic partition in StreamTask#{{consumedOffsets, we need to filter out topic partitions that have changes from last commit. And commit transaction.}} {{4) And partitions with no changes from last commit Map, need to sent to `}}{{commitSync}}{{` method, once KIP-211 is merged.}} {{Can you please clarify whether point 3 & 4 applicable irrespective of EOS is turned-ON/OFF.And}}{{ }}*consumedOffsets* Map or *stateMgr* Map in `StreamTask` gives the information of last commit? {{ }} ` ` was (Author: adireddijagad...@gmail.com): Hi [~mjsax], Sorry for the late response. I am on vacation. I want to re-iterate the solution provided from above comments. 1) When EOS is turned on, we need to call StreamTask#commitOffsets for both `PipeInput` and `advanceWallClockTime` cases. 2) We need to delete `commitOffsetNeeded` field in `StreamTask` class. 3) For each topic partition in StreamTask#{{consumedOffsets, we need to filter out topic partitions that have changes from last commit. And commit transaction.}} {{4) And partitions with no changes from last commit Map, need to sent to `}}{{commitSync}}{{` method, once KIP-211 is merged.}} {{Can you please clarify whether point 3 & 4 applicable in both cases EOS is turned-ON/OFF.And}}{{ }}*consumedOffsets* Map or *stateMgr* Map in `StreamTask` gives the information of last commit? {{ }} ` ` > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492827#comment-16492827 ] Jagadesh Adireddi commented on KAFKA-6906: -- Hi [~mjsax], Sorry for the late response. I am on vacation. I want to re-iterate the solution provided from above comments. 1) When EOS is turned on, we need to call StreamTask#commitOffsets for both `PipeInput` and `advanceWallClockTime` cases. 2) We need to delete `commitOffsetNeeded` field in `StreamTask` class. 3) For each topic partition in StreamTask#{{consumedOffsets, we need to filter out topic partitions that have changes from last commit. And commit transaction.}} {{4) And partitions with no changes from last commit Map, need to sent to `}}{{commitSync}}{{` method, once KIP-211 is merged.}} {{Does point 3 & 4 applicable in both cases EOS is turned-ON/OFF. D}}{{oes }}*consumedOffsets* Map or *stateMgr* Map in `StreamTask` gives the information of last commit? {{ }} ` ` > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477158#comment-16477158 ] Jagadesh Adireddi commented on KAFKA-6906: -- [~mjsax], Identified fix for this. Once we decide to go head with the ticket, will send the PR. > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6906: Assignee: Jagadesh Adireddi > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437360#comment-16437360 ] Jagadesh Adireddi commented on KAFKA-6677: -- Hi @mjsax , Fixed `StreamsConfig` to set the default to 5, and allow users to configure a smaller value if they wish, and throw an exception if they configure a larger value. Can you please review and let me know if any changes needed. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6677: Assignee: Jagadesh Adireddi > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16433723#comment-16433723 ] Jagadesh Adireddi commented on KAFKA-6677: -- Hi [~mjsax], I am thinking to contribute to this issue. As per ticket description, by removing ` tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);` . from Class: StreamsConfig will resolve the issue, or i need to dig more for fixing this. Kindly provide your inputs. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6749) TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
[ https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6749: Assignee: Jagadesh Adireddi > TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE > -- > > Key: KAFKA-6749 > URL: https://issues.apache.org/jira/browse/KAFKA-6749 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Frederic Arno >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > Stream processing topologies which are configured to use {{EXACTLY_ONCE}} > processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests > usually crash with {{java.lang.IllegalStateException: MockProducer hasn't > been initialized for transactions}} within the second call to > {{TopologyTestDriver.pipeInput()}}, the first call works fine. > Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass. > This is a problem because it is expected that proper processor topologies can > be successfully tested using {{TopologyTestDriver}}, however > {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during > tests. To a developer, this usually means that there is something wrong with > their processor topologies. > Kafka developpers can reproduce this by adding: > {code:java} > put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE);{code} > to line 88 of TopologyTestDriverTest: > streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java > Originally [reported on the > ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427284#comment-16427284 ] Jagadesh Adireddi commented on KAFKA-5253: -- Hi [~mjsax], Made changes to `{{TopologyTestDriver`}} class for supporting patterns. Kindly review and let me know if any changes needed. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > **Note: the example below is outdate as it used the old KStreamTestDriver. > The overall test layout can be adopted for TopologyTestDriver though, thus, > we just leave it in the description.** > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); // this should be > TopologyTestDriver > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change.- -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6538: Assignee: Jagadesh Adireddi > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420657#comment-16420657 ] Jagadesh Adireddi commented on KAFKA-6538: -- Hi [~mjsax], To work on this ticket, I would like to get clarification on few things. In `RocksDBStore` can we enhance exception by doing `Bytes.wrap(byte[])` on Key/Value and throw exception. And for Class RecordCollectorImpl, Error message stated in ticket as value [] timestamp YYY) to topic TTT .But i see *value* **type is already in *Bytes*. I guess it already displaying readable value and no changes required. Please do correct me, if i am missing something. > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417896#comment-16417896 ] Jagadesh Adireddi commented on KAFKA-5253: -- Hi [~mjsax], I am new to Kafka contribution. I made code changes for above issue and it's working as expected. I have few questions before I submit pull request. I made changes inside *private* method *KStreamTestDriver*#*sourceNodeByTopicName .* I haven't modified any method signature or so. Just embedded below code {code:java} Set sourceTopics = topology.sourceTopics(); for (final String eachSourceTopic : sourceTopics) { if (Pattern.compile(eachSourceTopic).matcher(topicName).matches()) { return topology.source(eachSourceTopic); } } {code} Do i still need to submit KIP for this change, as i am not touching any public methods. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-5253: Assignee: Jagadesh Adireddi > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6685: Assignee: Jagadesh Adireddi > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406081#comment-16406081 ] Jagadesh Adireddi edited comment on KAFKA-6685 at 3/22/18 12:28 PM: Hi [~rhauch], I would like to pick this Jira up. Wanted some advice on the error message .I am thinking , if the exception occurred during: a) keyAndSchema conversion, then error msg logged as : Failed to convert Record Key to Kafka Connect format. b) valueAndSchema conversion, then error msg logged as : Failed to convert Record Value to Kafka Connect format. And Main ConnectException thrown as : Exiting WorkerSinkTask due to unconvertedmessage exception. Any help on the hint message to fix the issue would be great? was (Author: adireddijagad...@gmail.com): Hi [~rhauch], I would like to pick this Jira up. Wanted some advice on the error message .I am thinking , if the exception occurred during: a) keyAndSchema conversion, then error msg logged as : Failed to convert message Key And Schema to Kafka Connect format. b) valueAndSchema conversion, then error msg logged as : Failed to convert message Value And Schema to Kafka Connect format. And Main ConnectException thrown as : Exiting WorkerSinkTask due to unconvertedmessage exception. Any help on the hint message to fix the issue would be great? > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407436#comment-16407436 ] Jagadesh Adireddi commented on KAFKA-6685: -- Hi , I am unable to assign this issue to myself. I made code changes. Waiting to get clarity on Error messages. Could you please assign this ticket to me. > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406081#comment-16406081 ] Jagadesh Adireddi edited comment on KAFKA-6685 at 3/20/18 10:25 AM: Hi [~rhauch], I would like to pick this Jira up. Wanted some advice on the error message .I am thinking , if the exception occurred during: a) keyAndSchema conversion, then error msg logged as : Failed to convert message Key And Schema to Kafka Connect format. b) valueAndSchema conversion, then error msg logged as : Failed to convert message Value And Schema to Kafka Connect format. And Main ConnectException thrown as : Exiting WorkerSinkTask due to unconvertedmessage exception. Any help on the hint message to fix the issue would be great? was (Author: adireddijagad...@gmail.com): Hi [~rhauch], I would like to pick this Jira up. Wanted some advice on the error message . I am thinking , if the exception occurred during: a) keyAndSchema conversion, then error msg logged as : Failed to convert message Key And Schema to Kafka Connect format. b) valueAndSchema conversion, then error msg logged as : Failed to convert message Value And Schema to Kafka Connect format. And Main ConnectException thrown as : Exiting WorkerSinkTask due to unconvertedmessage exception. Any help on the hint message to fix the issue would be great? > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes
[ https://issues.apache.org/jira/browse/KAFKA-6629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405191#comment-16405191 ] Jagadesh Adireddi commented on KAFKA-6629: -- Hi Guozhang Thank you for the information. I got clear picture now. > SegmentedCacheFunctionTest does not cover session window serdes > --- > > Key: KAFKA-6629 > URL: https://issues.apache.org/jira/browse/KAFKA-6629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, unit-test > > The SegmentedCacheFunctionTest.java only covers time window serdes, but not > session window serdes. We should fill in this coverage gap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes
[ https://issues.apache.org/jira/browse/KAFKA-6629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404947#comment-16404947 ] Jagadesh Adireddi commented on KAFKA-6629: -- Hi [~guozhang], I am new to kafka contribution. I am looking forward to contribute to this ticket. I don't see any code written for time window serdes in *SegmentedCacheFunctionTest.java* file. Could you please conform . > SegmentedCacheFunctionTest does not cover session window serdes > --- > > Key: KAFKA-6629 > URL: https://issues.apache.org/jira/browse/KAFKA-6629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, unit-test > > The SegmentedCacheFunctionTest.java only covers time window serdes, but not > session window serdes. We should fill in this coverage gap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)