[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725357#comment-17725357
 ] 

Matthias J. Sax commented on KAFKA-7497:


Seems to be fixed. Cf https://issues.apache.org/jira/browse/KAFKA-14209 

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2022-06-09 Thread Rohit Verma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552538#comment-17552538
 ] 

Rohit Verma commented on KAFKA-7497:


In fact, our use case is also for multiple events on the same topic 
[https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/.|https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/]
 As suggested in the example if I push Customer and Order events in the same 
topic/partition I want to do a self join to get customerOrders too.

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2021-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396307#comment-17396307
 ] 

Matthias J. Sax commented on KAFKA-7497:


Given my above comment:
{quote}Also, and this seems to be the most severs issue, each record would join 
with itself, what is actually not desired...
{quote}
I think this is actually not correct... At least if we consider self-joins in 
standard SQL, a record would join with itself. We should follow the same 
semantics, and thus, it's possible (even not efficient) today with Kafka Stream 
to do a self-join,

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2019-01-15 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743468#comment-16743468
 ] 

Guozhang Wang commented on KAFKA-7497:
--

>From the expressiveness of the operators, I think there are cases of stream 
>self-join that cannot be captured with stream aggregations still, since the 
>window is really "sliding" (but if we add a sliding window type aggregations, 
>it may equal to the semantics of streams self-join).

>From the API point of view, I think allowing stream self join even assuming 
>its use cases can be captured with sliding window aggregations still provides 
>programmability benefits. But the underlying implementation should be 
>different to any of our current internal impls. I think we can still have an 
>umbrella KIP that includes the following:

1. Add sliding window based aggregations.
2. Allow windowed stream self-join; and when detected it convert it to a 
sliding window based aggregation behind the scene for efficient 
implementations. 

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2019-01-15 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743329#comment-16743329
 ] 

John Roesler commented on KAFKA-7497:
-

Ah. I was looking at the javadoc on `KStream#join`. My bad.

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2019-01-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743249#comment-16743249
 ] 

Matthias J. Sax commented on KAFKA-7497:


You understanding seems correct. Also not the JavaDocs from `JoinWindows`
{quote}{{* In SQL-style you would express this join as}}
{{* }}
{{* SELECT * FROM stream1, stream2}}
{{* WHERE}}
{{*   stream1.key = stream2.key}}
{{*   AND}}
{{*   stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after}}
{quote}
I agree, it's not necessarily a public API change. However, it's might still be 
a major change that we might want to back up with a KIP. Not sure. In the end, 
it's an optimization to void two state stores, because one state store should 
be sufficient to compute the self-join.

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2019-01-15 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743225#comment-16743225
 ] 

John Roesler commented on KAFKA-7497:
-

Thanks [~mjsax],

I see that the "key" field in Kafka can be set to anything. My question was 
about the semantics of a stream-stream join. I've read our javadoc, and all it 
says is that it does an "inner equi-join" restricted by the time window. I 
guess this means that, given two streams `U=` and `V=`, 
it produces at least one result pair `(ui, vj)` for each pair in the cartesian 
product of the streams such that `ui.key == vj.key` and `abs(ui.time - vj.time) 
<= window_size`. Under this definition, if we happen to set V := U, then the 
operation is still well defined.

It sounds like this is the precise ask, since at the moment, choosing `V := U` 
throws a runtime error, even though it's not semantically prohibited.

It does seem like part of the scope of work should be to implement it 
efficiently, that is, to detect that both streams are actually the same at 
topology-build-time and ensure that we only need one join window store.

If I understand this scoping correctly, there's no public API change, just a 
behavior change. Also, since it's currently not possible to start a topology 
with a stream self-join, there's no deprecation or migration plan needed. 
Therefore no KIP is required.

Sound good?

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2019-01-14 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742733#comment-16742733
 ] 

Matthias J. Sax commented on KAFKA-7497:


I disagree "that you have a stream of unique events" – the join condition is 
defined on the record key but the record key is not a primary key for streams: 
for example, you can have a stream of clicks using the page-id as key. Also 
note, that each record might join multiple times, not just once.
{quote}one side of a pair may be arbitrarily delayed or disordered, which leads 
to the need for memory on one or both sides of the join.{quote}
Not sure what you mean by this. If you refer to the join window, I think this 
is two different thing. "Delay" or "disorder" seem to refer to wall-clock time, 
but the join is defined on event-time. Thus, the semantics is to join events 
that happen temporarily close to each other.

This can be translated to the self-join case too: consider the clickstream 
example with page-id as key, it mean to return all pages, for which there is 
more than one click within the time window.

I don't think this is related to similarity joins at all.

 

 

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2019-01-14 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742729#comment-16742729
 ] 

John Roesler commented on KAFKA-7497:
-

Note, the use-case reference above is now at: 
[https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection-README.adoc]

 

I _think_ I see the rationale of this ask:

I guess the difference between a windowed aggregation and a self-join is that 
the windowed aggregation would require you to save all the occurrences of the 
key in the window and do your computation (which is a pairwise computation) 
over the collection, whereas the self-join naturally gives you a stream of all 
the pairwise matches when the same key re-occurs within the window in the 
stream.

IIUC, you can do the same computation either way, but it's more naturally 
expressed as a pairwise comparison, so the self-join is more ergonomic? 
(Although, if there are two fraudulent transactions on the same account, they 
would show up in the existing program as two independent potential frauds, 
whereas the aggregation method gives you the opportunity to generate just one 
fraud report with two occurrences)

 

On the other hand, I'm struggling to see the semantics of this feature clearly.

As I understand it, the semantics of a stream-stream join in general is that 
you have a stream of unique events U= and another stream of unique 
events V=, and you effectively want to "zip" them to produce 
J=<(u1,v1), (u2,v2), (u3,v3)>. Note that the indices are the identifying key, 
and they are unique. The standard stream processing complications apply, one 
side of a pair may be arbitrarily delayed or disordered, which leads to the 
need for memory on one or both sides of the join.

The use case described in the document linked above seems different; the 
identifying information isn't unique, in that the same account id appears on an 
arbitrary number of events in the stream, but the account id is what we use as 
the joining key. It sounds similar to the "streaming similarity self-join" 
problem, which I found researching this issue: 
[http://www.vldb.org/pvldb/vol9/p792-defranciscimorales.pdf] . Under that 
definition, the "join" is actually just a cartesian product of every record in 
both streams, and for each pair, you compute a similarity, keeping (or 
discarding) only the pairs that have an above-threshold similarity score. There 
are a number of relaxations/optimizations required to do this efficiently, 
which in our case means limiting the product to only those records that already 
share the same account id, and of course limiting the join temporally.

 

I suppose my question, after looking into this is (despite the shared name and 
the fact that the existing implementation happens to suit both), are these two 
operations really the same concept? If we say "yes", for example, then we may 
prohibit future optimizations. For example, in the former stream-stream join, 
you know that the events' keys are unique, so once you produce a pair, you can 
immediately forget both of the input events for it. But for the 
similarity-join, you have to remember the input events until the pre-defined 
join window closes.

 

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2018-10-12 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648105#comment-16648105
 ] 

Guozhang Wang commented on KAFKA-7497:
--

[~mjsax] You're right, I was only thinking about the use cases that the stream 
needs some enrichment that are independent of any other streams / tables; I 
also agree with you that even with sliding window, it seems windowed 
aggregations should be sufficient still.

[~rmoff] I looked at the use case, and it seems to be a better fit with 
"session window" aggregations of KStream than using KStream self-join: 
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2018-10-12 Thread Robin Moffatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647671#comment-16647671
 ] 

Robin Moffatt commented on KAFKA-7497:
--

Here's the use case, inspecting a stream of transactions looking for possible 
fraud: 
https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection.adoc

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2018-10-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647138#comment-16647138
 ] 

Matthias J. Sax commented on KAFKA-7497:


[~guozhang] I cannot follow? A self-join would still have a 
sliding-join-window, and thus, all records with the same key within the window 
would be joined. How can a stateless map achieve this? I want to add, that a 
sliding-window aggregation might allow to compute the same thing – note, 
though, that Kafka Streams only supports hopping/tumbling windows for 
aggregations atm, but no sliding windows.

For the use case: this is also unclear to me to be honest though.

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2018-10-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647044#comment-16647044
 ] 

Guozhang Wang commented on KAFKA-7497:
--

[~rmoff] Before we dive into the implementation details, could you list the 
motivations for self-joins in Streams? Note that since stream-join only support 
key-based join now, self-join should be well covered by just "enrich" the 
stream record via `map` etc.

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2018-10-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646892#comment-16646892
 ] 

Matthias J. Sax commented on KAFKA-7497:


It is correct, that a topic cannot be consumed twice (cf. 
https://issues.apache.org/jira/browse/KAFKA-6687) and I also agree that a 
self-join operator would be useful.

Once could express a self-joining like this:
{noformat}
KStream stream = builder.stream(...);
stream.join(stream, ...);{noformat}
However, the execution of the join would not be efficient, as two state stores 
with two changelog topics would be created (both containing the exact same 
data). Also, and this seems to be the most severs issue, each record would join 
with itself, what is actually not desired...

I marked this as "needs-kip" but I am not 100% sure if we would need a KIP 
though. Maybe, Kafka Streams could detect internally, that left hand side 
KStream and right hand side KStream is the same object and just use a different 
operator implementation (ie, a dedicated self-join processor). This way, no 
public API change would be required.

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)