[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839941#comment-15839941 ] Ismael Juma commented on KAFKA-3429: Should we move this to a future release version (or none) given that the code freeze for 0.10.2 is imminent? > Remove Serdes needed for repartitioning in KTable stateful operations > - > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > Labels: api, newbie++ > Fix For: 0.10.2.0 > > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359231#comment-15359231 ] Guozhang Wang commented on KAFKA-3429: -- I think this ticket should be re-defined a bit for KTable.groupBy: Today if user does not provide a serde when calling this operator, the default serde from the configs are used. But for a special case where the KTable was created directly from an aggregate operator, we can use the key-serdes specified in the previous aggregate operator instead of the default config in configs, and if there is no {{mapValues}} in between, we can also use the value-serdes in the previous aggregate operator as well. And it now seems be quite a nitch optimization. > Remove Serdes needed for repartitioning in KTable stateful operations > - > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax > Labels: api, newbie++ > Fix For: 0.10.1.0 > > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245384#comment-15245384 ] Matthias J. Sax commented on KAFKA-3429: Ok. This does not contradict my thoughts. In general, we cannot reuse it, as the types might change. For special cases in which types are equal we can. > Remove Serdes needed for repartitioning in KTable stateful operations > - > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax > Labels: api, newbie++ > Fix For: 0.10.1.0 > > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245142#comment-15245142 ] Guozhang Wang commented on KAFKA-3429: -- The {{originalSerde}} is used to materialize the KTable that is going to be repartitioned on an internal topic, and in cases where this KTable is generated from a source topic or is created from an aggregation, where a `serde` is specified in that previous operation. To take an concrete example: {{KTable aggregate(Initializer initializer, Aggregator adder, Aggregator substractor, KeyValueMapper > selector, Serde keySerde, Serde valueSerde, Serde aggValueSerde, String name);}} Here the {{keySerde}} and {{valueSerde}} are used for materializing the selected {{ }} pairs, and {{keySerde}} and {{aggValueSerde}} are used for materializing the aggregated key-values {{ }}. In this general case all three serdes are needed since they are not provided before. While in some special cases like: {{ KTable count(KeyValueMapper selector, Serde keySerde, Serde valueSerde, String name);}} We can save the {{valueSerde}} as it may be already provided. > Remove Serdes needed for repartitioning in KTable stateful operations > - > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax > Labels: api, newbie++ > Fix For: 0.10.1.0 > > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15244771#comment-15244771 ] Matthias J. Sax commented on KAFKA-3429: [~guozhang] I am a little confused here: {{originalSerde}} is used to materialize {{table1}} after it got repartitioned using {{selector}}. Thus, key and value might have different types now and it is not ensured that {{serde}} can be reused. Or do I miss something here? (One more question: {{serde}} is for the key or for the value -- or does it refer to both) > Remove Serdes needed for repartitioning in KTable stateful operations > - > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax > Labels: api, newbie++ > Fix For: 0.10.1.0 > > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)