[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2017-01-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839941#comment-15839941
 ] 

Ismael Juma commented on KAFKA-3429:


Should we move this to a future release version (or none) given that the code 
freeze for 0.10.2 is imminent?

> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, newbie++
> Fix For: 0.10.2.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359231#comment-15359231
 ] 

Guozhang Wang commented on KAFKA-3429:
--

I think this ticket should be re-defined a bit for KTable.groupBy: 

Today if user does not provide a serde when calling this operator, the default 
serde from the configs are used. But for a special case where the KTable was 
created directly from an aggregate operator, we can use the key-serdes 
specified in the previous aggregate operator instead of the default config in 
configs, and if there is no {{mapValues}} in between, we can also use the 
value-serdes in the previous aggregate operator as well.

And it now seems be quite a nitch optimization.



> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2016-04-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245384#comment-15245384
 ] 

Matthias J. Sax commented on KAFKA-3429:


Ok. This does not contradict my thoughts. In general, we cannot reuse it, as 
the types might change. For special cases in which types are equal we can.

> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2016-04-17 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245142#comment-15245142
 ] 

Guozhang Wang commented on KAFKA-3429:
--

The {{originalSerde}} is used to materialize the KTable that is going to be 
repartitioned on an internal topic, and in cases where this KTable is generated 
from a source topic or is created from an aggregation, where a `serde` is 
specified in that previous operation. To take an concrete example:

{{ KTable aggregate(Initializer initializer,
Aggregator adder,
Aggregator substractor,
KeyValueMapper> 
selector,
Serde keySerde,
Serde valueSerde,
Serde aggValueSerde,
String name);}}

Here the {{keySerde}} and {{valueSerde}} are used for materializing the 
selected {{}} pairs, and {{keySerde}} and {{aggValueSerde}} are used 
for materializing the aggregated key-values {{}}. In this general case 
all three serdes are needed since they are not provided before. While in some 
special cases like:

{{ KTable count(KeyValueMapper selector,
Serde keySerde,
Serde valueSerde,
String name);}}

We can save the {{valueSerde}} as it may be already provided.

> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2016-04-17 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15244771#comment-15244771
 ] 

Matthias J. Sax commented on KAFKA-3429:


[~guozhang] I am a little confused here: {{originalSerde}} is used to 
materialize {{table1}} after it got repartitioned using {{selector}}. Thus, key 
and value might have different types now and it is not ensured that {{serde}} 
can be reused. Or do I miss something here? (One more question: {{serde}} is 
for the key or for the value -- or does it refer to both)

> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)