[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690159#comment-16690159
 ] 

Eugen Feller commented on KAFKA-7634:
-------------------------------------

Sure, let me try what I can do. Code looks like this:

 
{code:java}
val table =
  bldr
    .table[Key, Value1](
      keySerde,
      valueSerde1,
      topicA,
      stateStoreName
    )

val stream1 =
  bldr
    .stream[Key, Value2](
      keySerde,
      valueSerde2,
      topicB
    )
    .filterNot((k: Key, s: Value2) => s == null)

val enrichedStream = stream1
  .leftJoin[Value1, Value3](
    table,
    joiner,
    keySerde,
    valueSerde2
  )

val explodedStream =
  bldr
    .stream[Mac, Value4](
      keySerde,
      valueSerde4,
      topicC
    )
    .flatMapValues[Value3]()

val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream)
mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, 
outputTopic){code}
 

> Punctuate not being called with merge() and/or outerJoin()
> ----------------------------------------------------------
>
>                 Key: KAFKA-7634
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7634
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3
>            Reporter: Eugen Feller
>            Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to