Try to do a

.toStream().filter(...).foreach(...)


-Matthias


On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> Are you using the 0.10.0.0 release or from trunk?
> 
> On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> 
>> Hi Guozhang,
>>
>> Yes, I tried to apply the filter on the KTable that came from join, and
>> then the foreach on the KTable that came from filter.  I was still getting
>> the nulls through to my foreach.
>>
>> It is easy to workaround, but, the behaviour was especially surprising when
>> the filter didn't prevent it.
>>
>> Mathieu
>>
>>
>> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>>> Hi Mathieu,
>>>
>>> As Matthias said, we are working on improving the current join semantics:
>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>>>
>>> and will keep you updated.
>>>
>>>
>>> As for KTable.filter(), I think it can actually achieve want you want:
>> not
>>> forwarding nulls to the downstream operators; have you tried it out but
>>> find it is not working?
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
>>> mathieu.fenn...@replicon.com> wrote:
>>>
>>>> Hm... OK, I think that makes sense.
>>>>
>>>> It seems like I can't filter out those tombstone records; is that
>>> expected
>>>> as well?  If I throw in a .filter operation before my foreach, its
>>>> Predicate is not invoked, and the foreach's ForeachAction is invoked
>>> with a
>>>> null value still.
>>>>
>>>> Mathieu
>>>>
>>>>
>>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
>> matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> Hi Mathieu,
>>>>>
>>>>> join semantics are tricky. We are still working on a better
>>>>> documentation for it...
>>>>>
>>>>> For the current state and your question:
>>>>>
>>>>> Each time a record is processed, it looks up the other KTable to see
>> if
>>>>> there is a matching record. If non is found, the join result is empty
>>>>> and a tombstone record with <key:null> is sent downstream. This
>>> happens,
>>>>> to delete any (possible existing) previous join result for this key
>> --
>>>>> keep in mind, that the result is a KTable containing the current
>> state
>>>>> of the join.
>>>>>
>>>>> This happens both ways, thus, if your first records of each stream do
>>>>> not match on the key, both result in a <key:null> message to delete
>>>>> possible existing join-tuples in the result KTable.
>>>>>
>>>>> Does this make sense to you?
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
>>>>>> Hello Kafka users,
>>>>>>
>>>>>> I'm seeing some unexpected results when using Kafka Streams, and I
>>> was
>>>>>> hoping someone could explain them to me.  I have two streams, which
>>>> I've
>>>>>> converted KStream->KTable, and then I am joining them together
>> with a
>>>>>> "join" (not an outer join, not a full join).  With the resulting
>>> KTable
>>>>>> from the join, I am performing a foreach.
>>>>>>
>>>>>> When I startup my Streams application, my foreach receives two
>>> records
>>>>> with
>>>>>> valid keys but null values *before* my ValueJoiner ever gets
>>> executed.
>>>>> Why
>>>>>> would that be?
>>>>>>
>>>>>> Code excerpt; please excuse the use of Kotlin here:
>>>>>>
>>>>>> val builder = KStreamBuilder()
>>>>>>
>>>>>> val approvalStatus = builder.table(
>>>>>>         Serdes.String(),
>>>>>>
>>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
>>>>>>         "TimesheetApprovalStatusChanged"
>>>>>> )
>>>>>>
>>>>>> val timesheetLastApprovalAction = builder.table(
>>>>>>         Serdes.String(),
>>>>>>         JsonSerde(Map::class.java),
>>>>>>         "TimesheetApprovalActionPerformed"
>>>>>> )
>>>>>>
>>>>>> val timesheetStatus =
>>> approvalStatus.join(timesheetLastApprovalAction,
>>>> {
>>>>>> approvalStatus, lastApprovalAction ->
>>>>>>     println("EXECUTING ValueJoiner")
>>>>>>     computeTimesheetStatus(approvalStatus.approvalStatus!!,
>>>>> lastApprovalAction
>>>>>> as Map<String, Any?>)
>>>>>> })
>>>>>>
>>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
>>>>>>     println("EXECUTING ForeachAction: $timesheetKey, status:
>>>>>> $timesheetStatus")
>>>>>>     if (timesheetStatus == null) {
>>>>>>         println("SKIPPING NULL I DON'T UNDERSTAND")
>>>>>>     }
>>>>>> })
>>>>>>
>>>>>>
>>>>>> Resulting console output:
>>>>>>
>>>>>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>>>>>> status: null
>>>>>> SKIPPING NULL I DON'T UNDERSTAND
>>>>>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>>>>>> status: null
>>>>>> SKIPPING NULL I DON'T UNDERSTAND
>>>>>> EXECUTING ValueJoiner
>>>>>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>>>>>> status: urn:replicon:timesheet-status:submitting
>>>>>>
>>>>>>
>>>>>> Any explanation on why the foreach would be executing for data that
>>>>> hasn't
>>>>>> been generated by my join?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mathieu
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to