Hi Mathieu,

join semantics are tricky. We are still working on a better
documentation for it...

For the current state and your question:

Each time a record is processed, it looks up the other KTable to see if
there is a matching record. If non is found, the join result is empty
and a tombstone record with <key:null> is sent downstream. This happens,
to delete any (possible existing) previous join result for this key --
keep in mind, that the result is a KTable containing the current state
of the join.

This happens both ways, thus, if your first records of each stream do
not match on the key, both result in a <key:null> message to delete
possible existing join-tuples in the result KTable.

Does this make sense to you?

-Matthias

On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> Hello Kafka users,
> 
> I'm seeing some unexpected results when using Kafka Streams, and I was
> hoping someone could explain them to me.  I have two streams, which I've
> converted KStream->KTable, and then I am joining them together with a
> "join" (not an outer join, not a full join).  With the resulting KTable
> from the join, I am performing a foreach.
> 
> When I startup my Streams application, my foreach receives two records with
> valid keys but null values *before* my ValueJoiner ever gets executed.  Why
> would that be?
> 
> Code excerpt; please excuse the use of Kotlin here:
> 
> val builder = KStreamBuilder()
> 
> val approvalStatus = builder.table(
>         Serdes.String(),
>         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
>         "TimesheetApprovalStatusChanged"
> )
> 
> val timesheetLastApprovalAction = builder.table(
>         Serdes.String(),
>         JsonSerde(Map::class.java),
>         "TimesheetApprovalActionPerformed"
> )
> 
> val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> approvalStatus, lastApprovalAction ->
>     println("EXECUTING ValueJoiner")
>     computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction
> as Map<String, Any?>)
> })
> 
> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
>     println("EXECUTING ForeachAction: $timesheetKey, status:
> $timesheetStatus")
>     if (timesheetStatus == null) {
>         println("SKIPPING NULL I DON'T UNDERSTAND")
>     }
> })
> 
> 
> Resulting console output:
> 
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: null
> SKIPPING NULL I DON'T UNDERSTAND
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: null
> SKIPPING NULL I DON'T UNDERSTAND
> EXECUTING ValueJoiner
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: urn:replicon:timesheet-status:submitting
> 
> 
> Any explanation on why the foreach would be executing for data that hasn't
> been generated by my join?
> 
> Thanks,
> 
> Mathieu
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to