[ 
https://issues.apache.org/jira/browse/KAFKA-18191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17905597#comment-17905597
 ] 

Bill Bejeck commented on KAFKA-18191:
-------------------------------------

Hey [~ableegoldman] I looked into this and I couldn't reproduce the error.

 

I updated the `KStreamKStreamIntegrationTest` topology like so:
{code:java}
 final StreamJoined<String, String, String> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
                                                                       
.withName("joining-streams")
                                                                       
.withStoreName("joining-store");


stream1.outerJoin(stream2, 
                  joiner, 
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10)), 
                  streamJoined).to(OUTPUT); {code}
And in the resulting topology all the processors are named (prefixed more 
correctly) with what was supplied in `StreamJoined`:

 
{code:java}
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [leftStream])
      --> joining-streams-this-windowed
    Source: KSTREAM-SOURCE-0000000001 (topics: [rightStream])
      --> joining-streams-other-windowed
    Processor: joining-streams-other-windowed (stores: 
[joining-store-outer-other-join-store])
      --> joining-streams-outer-other-join
      <-- KSTREAM-SOURCE-0000000001
    Processor: joining-streams-this-windowed (stores: 
[joining-store-outer-this-join-store])
      --> joining-streams-outer-this-join
      <-- KSTREAM-SOURCE-0000000000
    Processor: joining-streams-outer-other-join (stores: 
[joining-store-outer-this-join-store, joining-store-outer-shared-join-store])
      --> joining-streams-merge
      <-- joining-streams-other-windowed
    Processor: joining-streams-outer-this-join (stores: 
[joining-store-outer-other-join-store, joining-store-outer-shared-join-store])
      --> joining-streams-merge
      <-- joining-streams-this-windowed
    Processor: joining-streams-merge (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- joining-streams-outer-this-join, joining-streams-outer-other-join
    Sink: KSTREAM-SINK-0000000007 (topic: output)
      <-- joining-streams-merge {code}
I even tried with a `keyBy` to force a repartition - but the naming was the 
same.

> StreamJoined name is not used for processor names
> -------------------------------------------------
>
>                 Key: KAFKA-18191
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18191
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Priority: Major
>
> The StreamJoined#as API allows you to set a name for a stream-stream join 
> operator. The intention is to allow one to name the stores, and therefore 
> changelogs, resulting in an upgradeable topology, but it is a bit strange 
> that the name isn't also used for the processors themselves.
> Of course, the stream-stream join is a bit of an edge case compared to, say, 
> a count or filter operator, where the operator is a 1:1 mapping to the 
> processor node and the user can name the processor exactly, because the 
> stream-stream join operator actually results in multiple processors which 
> each need a unique name. However, we could at least use the specified 
> StreamJoined name as the basis for the resulting processor names, to avoid 
> getting stuck with names like "KSTREAM-JOINTHIS-0000000004" and 
> "KSTREAM-WINDOWED-0000000003" which are difficult to interpret and make it 
> hard to read a topology
> Note that there is some existing precedent for this: for example with 
> cogroups, the individual processors inherit the base name of the cogroup's 
> aggregate operator name.  For example this code
>  
> {code:java}
> grouped1
> .cogroup((k, v, a) -> a + v) // wrapped 1
> .cogroup(grouped2, (k, v, a) -> a + v) // wrapped 2
> .aggregate(() -> "", Named.as("myName"), Materialized.as("store")) {code}
>  
> produces processors with these names: "myName-cogroup-agg-0", 
> "myName-cogroup-agg-1", "myName-cogroup-merge"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to