Hello Kafka users,

I'm seeing some unexpected results when using Kafka Streams, and I was
hoping someone could explain them to me.  I have two streams, which I've
converted KStream->KTable, and then I am joining them together with a
"join" (not an outer join, not a full join).  With the resulting KTable
from the join, I am performing a foreach.

When I startup my Streams application, my foreach receives two records with
valid keys but null values *before* my ValueJoiner ever gets executed.  Why
would that be?

Code excerpt; please excuse the use of Kotlin here:

val builder = KStreamBuilder()

val approvalStatus = builder.table(
        Serdes.String(),
        JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
        "TimesheetApprovalStatusChanged"
)

val timesheetLastApprovalAction = builder.table(
        Serdes.String(),
        JsonSerde(Map::class.java),
        "TimesheetApprovalActionPerformed"
)

val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
approvalStatus, lastApprovalAction ->
    println("EXECUTING ValueJoiner")
    computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction
as Map<String, Any?>)
})

timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
    println("EXECUTING ForeachAction: $timesheetKey, status:
$timesheetStatus")
    if (timesheetStatus == null) {
        println("SKIPPING NULL I DON'T UNDERSTAND")
    }
})


Resulting console output:

EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: null
SKIPPING NULL I DON'T UNDERSTAND
EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: null
SKIPPING NULL I DON'T UNDERSTAND
EXECUTING ValueJoiner
EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: urn:replicon:timesheet-status:submitting


Any explanation on why the foreach would be executing for data that hasn't
been generated by my join?

Thanks,

Mathieu

Reply via email to