[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2017-01-31 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847802#comment-15847802
 ] 

Matthias J. Sax commented on KAFKA-3543:


[~gfodor] I close this as duplicate. Nevertheless one question: I don't 
understand your comment in the JIRA description about "just calling forward() 
myself on the context and actually emitting dummy values which are filtered out 
downstream" ? Actually, using {{context.forward()}} is absolutely fine and 
efficient.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>  Labels: api
> Fix For: 0.10.3.0
>
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.
> -
> It is worth considering adding a new flatTransofrm function as 
> {code}
>  KStream transform(TransformerSupplier Iterable>> transformerSupplier, String... stateStoreNames)
> {code}
> which is essentially the same as
> {code} transform().flatMap() {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236434#comment-15236434
 ] 

Guozhang Wang commented on KAFKA-3543:
--

Yeah, we should definitely improve our documentation regarding serdes. In 
general the documentation website is not very well formatted: it is just a 
single VERY big page, and hence very hard for people to search and browse. We 
are thinking about reformatting the Apache doc website and add more contents 
about different modules like Connect and Streams.

I think it is actually doable to add a "flatTransform" function along with 
"transform" in KStreams. I will update this ticket to reflect this feature 
request.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236427#comment-15236427
 ] 

Greg Fodor commented on KAFKA-3543:
---

Just something akin to a flatTransform where the output of the transformer is 
assumed to have a value of some kind of enumerable. Probably would be pretty 
convoluted since it would impact a lot of method signatures.

I guess at a higher level, I wonder if there's any way to make your points on 
stateless vs non-stateless operators requiring Serdes more intuitive. It might 
back out to just docs.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236394#comment-15236394
 ] 

Guozhang Wang commented on KAFKA-3543:
--

What kind of helper methods do you have in mind? Interested to know.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236375#comment-15236375
 ] 

Greg Fodor commented on KAFKA-3543:
---

That makes sense, thanks! It may be useful to provide a helper method here of 
some kind, but the idiom above seems reasonable. Happy to consider this issue 
closed either way.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236355#comment-15236355
 ] 

Guozhang Wang commented on KAFKA-3543:
--

In Kafka Streams we do not materialize in each operator, but only necessary in 
those stateful operators (like joins and aggregates); for stateless operators 
like map / filter / flatMap you do not need to provide the serdes. More 
concretely, let's say you have the following topology:

{code}
stream1.map(...) // no serde needed
 .map(...) // no serde needed
 .join(stream2)  // now you need to provide the serde for data 
types returned from the second map
{code}

You only need to provide the serde in the join operator, and the returned value 
from the first map will be passed directly to the second map, and hence not 
need to materialized anywhere.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236339#comment-15236339
 ] 

Greg Fodor commented on KAFKA-3543:
---

Ah interesting, my assumption was that the items emitted from the transform 
need to be serializable. Is the pattern you describe above basically avoiding 
this because the flatMap unrolls the Iterables into serializable values?

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236313#comment-15236313
 ] 

Guozhang Wang commented on KAFKA-3543:
--

[~gfodor] We have seen such use cases in other places, and there is a common 
pattern of using:

{code}stream.transform(...).flatMap((k, v) -> v){code} where transform does the 
stateful computation and returns a >. Would that work 
for you?

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)