It's unclear to me, where the `null` value comes from. Also, on restart
it should try to reprocess the same record and fail again.

There seems to be some bug, but it's unclear to me what it might be.
I'll try to reproduce the issue.

For your application, I would recommend to handle `null` in your
deserialize though. As an alternative, you can also specify a different
deserialization exception handler, that skip records on a
deserialization error.

Cf.
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-deserialization-exception-handler

-Matthias

On 5/18/18 5:07 AM, Daniele Tria wrote:
> val stream: KStream[String, Event] = eventStream.selectKey(new
> KeyValueMapper[Array[Byte], Event, String] {
>   override def apply(key: Array[Byte], value: Event): String = {
>     value.id
>   }
> })
> 
> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
> serdeEvent))
> 
> globalKTable match {
>   case None => {
>     globalKTable = Some(builder.globalTable[String, Event](eventTopic,
>       Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
>       Materialized.as[String, Event, KeyValueStore[Bytes,
> Array[Byte]]]("eventStore")))
>   }
>   case _ => ()
> }
> 
>>> What do you mean by "the records written formerly are not null" ?
> 
> In the same job I expect the GlobalKTable to read from the topic
> 'eventTopic' which is filled with not-null records from the KStream
> 'stream' after being key-mapped.
> Then, this is the job routine and the above code implements it.
> 
> So, consider that I package my application and I run it with empty
> environment (new topics): the globalKTable read a null record from the
> 'eventTopic' and my application breaks because the null record keep my
> deserializer failing (because since I know that I'm filling this topic with
> not-null records I won't consider null values).
> Then. if I try to run again my job (after the failure) without cleaning my
> environment (so on the same topics), my GlobalKTable is going to read the
> records as expected (so not-null records).
> 
> 
> 
> 
> 2018-05-18 0:04 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
> 
>>>> So, are you implying that `GlobalKTable` can not consume records from a
>>>> topic populated at runtime by the same job,
>>>> but it needs instead a pre-populated topic either some other application
>>>> populating it?
>>
>> No, I did not indent to imply this. During runtime, GlobalKTable will
>> still consume data from its input topic and update itself accordingly.
>>
>> I am still unclear what you mean by
>>
>>> The problem is in the first run,  where GlobalKTable reads null records
>>
>> What are `null records`?
>>
>>> (I
>>>>>> have a json serializer and it reads a record with null value and, of
>>>>>> course, the records written formerly are not null)
>>
>> What do you mean by "the records written formerly are not null" ?
>>
>> Records with `<key,null>` are so-called tombstone records and are
>> interpreted as deletes. Thus, if you have tomstones in the input topic,
>> it's by design that they delete data from the table.
>>
>>> In the next runs,
>>>>>> records are correctly read from topic and then saved in the store.
>>
>> You mean, you stop the application and restart it? This should not be
>> required. How long did you run the application in the first run? During
>> the first run, can you confirm that the app write date into the
>> GlobalKTable topic (I want to understand if the write or read path is
>> the issue).
>>
>>
>> -Matthias
>>
>> On 5/17/18 2:04 AM, Daniele Tria wrote:
>>> Hi Matthias, thanks for the reply.
>>>
>>>>> Can you elaborate? What is the behavior you expect?
>>>
>>> I want my GlobalKTable reading records from a topic A, that is populated
>>> within the same job
>>> (so I want my table changing at runitme). The producer of A, takes a
>>> key-mapped stream.
>>> Next I perform a left join between a KStream and the GlobalKTable on
>>> multiple partitions.
>>>
>>>>> On startup, GlobalKTable consumes the specified topic to the current
>>>>> end-offsets to pre-populate itself before any other processing starts.
>>>
>>> So, are you implying that `GlobalKTable` can not consume records from a
>>> topic populated at runtime by the same job,
>>> but it needs instead a pre-populated topic either some other application
>>> populating it?
>>> If so, I can not execute the next left join because globalKTable records
>>> must be consumed at runtime.
>>>
>>> Is there any possibility of using another method to obtain the expected
>>> result?
>>>
>>> Thank you,
>>>
>>> Daniele
>>>
>>> 2018-05-16 17:47 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
>>>
>>>> Should be ok to read the topic. I cannot spot any error in your
>>>> configs/program either.
>>>>
>>>> However, I am not entirely sure, if I understand the problem correctly.
>>>>
>>>>>> The problem is in the first run,  where GlobalKTable reads null
>> records
>>>> (I
>>>>>> have a json serializer and it reads a record with null value and, of
>>>>>> course, the records written formerly are not null).
>>>>
>>>> Can you elaborate? What is the behavior you expect?
>>>>
>>>> On startup, GlobalKTable consumes the specified topic to the current
>>>> end-offsets to pre-populate itself before any other processing starts.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 5/16/18 6:30 AM, Daniele Tria wrote:
>>>>> Dear community, I'm struggling with an issue regarding `GlobalKTable`:
>>>> that
>>>>> is demanded to read from a topic A, where I add key-mapped records from
>>>>> another stream. This topic A is populated within the same application
>> by
>>>> `
>>>>> stream.to` costruct with some events that I need to manipulate before
>>>> they
>>>>> can be sinked. So, The `GlobalKTable` has to materialize the records
>> read
>>>>> in its store.
>>>>> The problem is in the first run,  where GlobalKTable reads null records
>>>> (I
>>>>> have a json serializer and it reads a record with null value and, of
>>>>> course, the records written formerly are not null). In the next runs,
>>>>> records are correctly read from topic and then saved in the store.
>>>>> Here is the snippet code:
>>>>>
>>>>> val streamProperties = {
>>>>>   val settings = new Properties
>>>>>   settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp")
>>>>>   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> "localhost:9092")
>>>>>   settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>>>>> Serdes.Bytes.getClass.getName)
>>>>>   settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>>>>> Serdes.Bytes.getClass.getName)
>>>>>   settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
>>>>>   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>>>>>   settings.put(StreamsConfig.STATE_DIR_CONFIG,
>>>>> "in-memory-avg-store/myApp/global")
>>>>>   settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
>>>>> StreamsConfig.AT_LEAST_ONCE)
>>>>>   settings
>>>>> }
>>>>>
>>>>> var globalKTable: Option[GlobalKTable[String, Event]] = None
>>>>>
>>>>> val serdeEvent: Serde[Event] = JsonSerde[Event]
>>>>> val eventTopic = "eventTopicGlobalTable"
>>>>>
>>>>> val builder: StreamsBuilder = new StreamsBuilder()
>>>>>
>>>>> val stream: KStream[String, Event] = eventStream.selectKey(new
>>>>> KeyValueMapper[Array[Byte], Event, String] {
>>>>>   override def apply(key: Array[Byte], value: Event): String = {
>>>>>     value.id
>>>>>   }
>>>>> })
>>>>>
>>>>> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
>>>>> serdeEvent))
>>>>>
>>>>> globalKTable match {
>>>>>   case None => {
>>>>>     globalKTable = Some(builder.globalTable[String, Event](eventTopic,
>>>>>       Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
>>>>>       Materialized.as[String, Event, KeyValueStore[Bytes,
>>>>> Array[Byte]]]("eventStore")))
>>>>>   }
>>>>>   case _ => ()
>>>>> }
>>>>>
>>>>> val streams: KafkaStreams = new KafkaStreams(builder.build(),
>>>> streamProperties)
>>>>>
>>>>> streams.start()
>>>>>
>>>>>
>>>>> So my question is why is this happening? Am I not allowed to read from
>> a
>>>>> sinked topic in the same run? Either, Is it something related to my
>>>>> configuration?
>>>>>
>>>>> Thank you so much for your work.
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to