On Fri, Jan 4, 2019 at 12:54 PM rahul patwari <[email protected]> wrote:
> Hi Raghu, > Thanks for the response. > > I used withTopics() and withCreateTime() to read records from multiple > topics, "topic1" with message.timestamp.type=CreateTime and "topic2" > with message.timestamp.type=LogAppendTime. > And I got the Exception: java.lang.IllegalArgumentException: Kafka > record's timestamp is not 'CREATE_TIME' (topic: topic2, partition 0, offset > 0, timestamp type 'LogAppendTime'). > If you are reading 'withCreateTime()', all the records from all the topics are expected to have CREATE_TIME timestamp. I don't think it makes much logical sense for built in timestamp factories to support mixed special cases like this. Couple of options for you depending on your exact use case: - Read topic1 and topic2 as two separate PCollections (i.e. two instances of KafkaIO.read()). - Or implement a timestamp factory that handles the mixed case like you prefer (as you described below). It is pretty simple to implement them. You can start with one of the built in factories. > So, my understanding is that, only when withLogAppendTime() is used, we > don't want users to get stuck if some topics have multiple timestamp types. > In this case, where two topics have two different timestamp types, the > watermark will be calculated only based on the records which belong to the > topic with timestamp as LogAppendTime. I am thinking that we can calculate > the watermark more accurately if we also consider the records in the topic > with timestamp type CreateTime. So, instead of directly returning > currentWatermark, we can update currentWatermark with the record's > timestamp(either CreateTime (or) LogAppendTime) always and then return > currentWatermark. > > I would like to contribute if a fix is needed. > > Regards, > Rahul > > On Sat, Jan 5, 2019 at 12:40 AM Raghu Angadi <[email protected]> wrote: > >> The intention was to assert on 'timestamp_type' on the first record only. >> I was not entirely sure if there are situations in Kafka where a timestamp >> type could be different or timestamp itself could be missing for some >> records. The assertion on the first record was just to sanity check common >> misconfiguration. The way this policy checked for first record itself is >> incorrect in the case of idle partitions since the watermark advances even >> with out any records read.. this is the issue you encountered. When the >> timestamp type does not match, it's timestamp is not used to watermark. >> >> As you suggested, simpler fix might just be require every record's >> timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else'). >> Is that safe? We don't want users to get stuck if some topics are expected >> to have multiple timestamp types. >> >> Raghu. >> On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <[email protected]> >> wrote: >> >>> Hi, >>> >>> We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is >>> idle at the beginning of the pipeline, IllegalStateException is NOT thrown >>> even when log.message.timestamp.type = CreateTime. >>> >>> This happens due to the statement: >>> else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in >>> getTimestampForRecord() method in TimestampPolicyFactory Interface. >>> >>> As the topic is idle at the beginning of the pipeline, the >>> currentWatermark is advanced (backlog==0), because of which >>> currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and >>> the timestamp of the records are taken as currentWatermark. >>> >>> If we change else if() to else, IllegalStateException is thrown when the >>> first record from the Kafka topic is read, which is expected. >>> Is there any specific reason behind using else if() instead of else? >>> >>> Thanks and Regards, >>> Rahul >>> >>>
