Hi Gordon,

Thanks for confirming my understanding that the extractor should not have
to be defined for 0.10. However, I'm still experiencing the case where not
using an extractor results in zero window triggers.

I've verified the timestamps in the Kafka records with the following
command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
input --from-beginning --property print.timestamp=true
Where the 'input' topic happens to consist of Strings representing the time
the record was created. I get output such as the following:

CreateTime:1494998828813        1494998828813
CreateTime:1494998828901        1494998828901
CreateTime:1494998828914        1494998828914
CreateTime:1494998828915        1494998828914
CreateTime:1494998828915        1494998828915
CreateTime:1494998829004        1494998829003
CreateTime:1494998829016        1494998829016
CreateTime:1494998829016        1494998829016

where CreateTime is the timestamp generated by Kafka and the second value
is the record value. In this particular case that happens to be the time
the record was created in the producer, which resides on the same machine
as the Kafka broker (hence the identical values).

-Jia

On Tue, May 16, 2017 at 9:30 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Jia!
>
> This sounds a bit fishy. The docs mention that there is no need for a
> timestamp / watermark extractor because with 0.10, the timestamps that come
> with Kafka records can be used directly to produce watermarks for event
> time.
>
> One quick clarification: did you also check whether the timestamps that
> come with the Kafka 0.10 records are sound and reasonable?
>
> Cheers,
> Gordon
>
>
> On 17 May 2017 at 4:45:19 AM, Jia Teoh (jiat...@gmail.com) wrote:
>
> Hi,
>
> I'm trying to use KafkaConsumer010 as a source for a windowing job on
> event time, as provided by Kafka. According to the kafka connector doc (
> link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>),
> I've set the time characteristic to event time (
> streamExecutionEnvironment.setStreamTimeCharacteristic(
> TimeCharacteristic.EventTime)) and am using KafkaConsumer010 along with
> Kafka 0.10.2. I've also set up windowing: "stream.windowAll(
> TumblingEventTimeWindows.of(Time.milliseconds(windowMillis)))" (using
> timeWindowAll appears to be equivalent as well).
>
> Using these configurations I can verify that data is read from Kafka.
> However, the event time windows never trigger even when data is loaded for
> much longer than the window size. Is there an additional configuration I am
> missing?
>
> I have verified that the Kafka messages have timestamps. The docs mention
> that there is no need for a timestamp extractor, but using one to
> explicitly assign the current time does result in windows being triggered.
>
> Thanks,
> Jia Teoh
>
>

Reply via email to