[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-06-13 Thread Bobby Calderwood (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048626#comment-16048626
 ] 

Bobby Calderwood commented on KAFKA-3455:
-

Hi Michal Borowiecki,

Sorry for the late reply.  I am trying to implement both {{Transformer}} and 
{{Processor}} on the same class.  For some interesting use-cases in a Clojure 
compatibility library for Kafka Streams that I'm writing, I'd like to hook a 
single piece of logic into both the high-level (via {{Transformer}}) and 
low-level (via {{Processor}}) APIs.  However, when implementing both 
interfaces, I encounter the following error due to differing signatures of the 
respective {{punctuate(long)}} methods:

{code:none}
/TransducerProcessor.java
Error:Error:line (44)java: method punctuate(long) is already defined in 
class kafka_streams_clojure.TransducerProcessor
Error:Error:line (10)java: kafka_streams_clojure.TransducerProcessor is not 
abstract and does not override abstract method punctuate(long) in 
org.apache.kafka.streams.kstream.Transformer
Error:Error:line (40)java: punctuate(long) in 
kafka_streams_clojure.TransducerProcessor cannot implement punctuate(long) in 
org.apache.kafka.streams.kstream.Transformer
  return type void is not compatible with java.lang.Object
{code}

I believe you're right about AutoCloseable, it's been a while since I've been 
in Java-land :-)

Yes, I believe that once the incompatible punctuate methods are deprecated and 
then removed, my issue would be resolved.

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-05-15 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010530#comment-16010530
 ] 

Michal Borowiecki commented on KAFKA-3455:
--

Hi [~bobbycalderwood],

Can you please describe your use-case, where it would be useful to re-use 
Processor/Transformer implementations?

As to Transformer.punctuate return value having to be null, the javadoc was in 
error but has been fixed on trunk (to be released).

Changing the method signature of Transformer.punctuate would be a 
backward-incompatible change, however, 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics]
 will deprecated both methods in favour of a functional interface passed to 
ProcessorContext.schedule(), so it's a small step in the direction you're 
suggesting.

I think AutoCloseable is a false friend in this case. The intention behind 
AutoCloseable is for objects created in a try-with-resources statement to be 
closed when execution exists that statement. However, the Processor is being 
created when you are *defining* the topology and must not be closed from that 
same block of code, since it's used as long as the topology is actually 
*running*, which is happening in different threads.

As to init() and close() I think it would make sense to have them pulled out, 
however, again due to backwards-compatibility it's not as simple as it sounds.
Fortunately, once Java 7 compatibility is dropped, it will be possible to 
change their definition to a default method with an empty body. I think that 
would be backwards-compatible. That would leave only one abstract method for 
Processor and Transformer, process() and transform(), respectively. Since these 
are actually *different* from each other, I'd say that then there'd be no 
repetition.

Would that help your use-cases?

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-05-13 Thread Bobby Calderwood (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009389#comment-16009389
 ] 

Bobby Calderwood commented on KAFKA-3455:
-

The current Interface definitions for `Processor` and `Transformer` make it a 
bit difficult to re-use one for the other.  Specifically, the `void init()` and 
`void close()` method signatures are identical, but the `punctuate(long 
timestamp)` signature differs in a bizarre way: it has a return type `R` the 
same as `R Transformer.transform(K key, V value)`, but the docs specify that 
`null` must always be returned.

Wouldn't it make sense to DRY these up a bit by either a) changing the method 
signature of `R Transformer.punctuate(long timestamp)` to match that of 
`Processor` (i.e. with a `void` return type), and/or b) creating another 
interface encapsulating the lifecycle stuff (`init()`, `close()` [or just use 
Java's AutoCloseable], and `punctuate(long timestamp)`) and make `Processor` 
and `Transformer` single-method interfaces?  They could either inherit from the 
common lifecycle-ish interface, or else compose together with it in 
implementing classes.

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-04-05 Thread Nikki Thean (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15956941#comment-15956941
 ] 

Nikki Thean commented on KAFKA-3455:


Correct.

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-04-04 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955739#comment-15955739
 ] 

Matthias J. Sax commented on KAFKA-3455:


[~nthean] "I have done exactly this" -> you mean you did use 
{{stream.transform(...).to(...)}} ?

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-04-04 Thread Nikki Thean (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955651#comment-15955651
 ] 

Nikki Thean commented on KAFKA-3455:


If it helps, as a user, I have done exactly this and found it pretty intuitive. 
My app was built last December.

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.11.0.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2017-01-31 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847718#comment-15847718
 ] 

Matthias J. Sax commented on KAFKA-3455:


Just discovering this. I would close as "not a problem". \cc [~jonathan.bender] 
[~guozhang]

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.10.3.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3455) Connect custom processors with the streams DSL

2016-10-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15606491#comment-15606491
 ] 

Guozhang Wang commented on KAFKA-3455:
--

[~jonathan.bender] I'm looking at this JIRA again, and I'm wondering if it can 
actually be done via the {{transform}} function. As you mentioned in the email 
thread:

{code}
I was trying to build a topology using the DSL, add a custom processor and then 
add a sink for it. However, it seems like there's no great way to do
this using the APIs, since the processor's internal "name" is not exposed in 
the KStream interface, and the .process method doesn't return the latest 
KStream.
{code}

We could potentially do {{stream.transform(...).to(...)}}.

> Connect custom processors with the streams DSL
> --
>
> Key: KAFKA-3455
> URL: https://issues.apache.org/jira/browse/KAFKA-3455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Jonathan Bender
>  Labels: user-experience
> Fix For: 0.10.2.0
>
>
> From the kafka users email thread, we discussed the idea of connecting custom 
> processors with topologies defined from the Streams DSL (and being able to 
> sink data from the processor).  Possibly this could involve exposing the 
> underlying processor's name in the streams DSL so it can be connected with 
> the standard processor API.
> {quote}
> Thanks for the feedback. This is definitely something we wanted to support
> in the Streams DSL.
> One tricky thing, though, is that some operations do not translate to a
> single processor, but a sub-graph of processors (think of a stream-stream
> join, which is translated to actually 5 processors for windowing / state
> queries / merging, each with a different internal name). So how to define
> the API to return the processor name needs some more thinking.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)