That's what I thought Dian.

The problem is that setting the watermark strategy like that didn't work
either, the method on_event_time is never called.

I did some reading of
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
as Nicolaus suggested, and there it says it's recommended to set the
watermark directly in the Kafka connector and not the stream, but
unfortunately this cannot be done in Python.
The FlinkKafkaConsumer doesn't have an assign_timestamps_and_watermarks
method.

I also tried using from_source instead of add_source, like this:

    stream = env.from_source(
        FlinkKafkaConsumer(
            'flink_read_topic',
            SimpleStringSchema(),
            {'bootstrap.servers': 'eas-kafka.eclypsium.dev:9092'},
        ),

WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30)),
        'kafka_source',
        Types.STRING(),
    )

But it fails with the error:

py4j.protocol.Py4JError: An error occurred while calling o0.fromSource.
Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method
fromSource([class
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer, class
org.apache.flink.api.common.eventtime.WatermarkStrategy$$Lambda$143/0x0000000800246840,
class java.lang.String, class
org.apache.flink.api.common.typeinfo.BasicTypeInfo]) does not exist

So, it seems that the only way to set the watermark strategy is to do:
    stream = env.add_source(
        FlinkKafkaConsumer(
            'flink_read_topic',
            SimpleStringSchema(),
            {'bootstrap.servers': 'eas-kafka.eclypsium.dev:9092'},
        ),
    )

    stream = stream.assign_timestamps_and_watermarks(
        WatermarkStrategy.for_bounded_out_of_orderness(
            Duration.of_seconds(30),
        )
    )

Which is not working for me

On Tue, Aug 3, 2021 at 10:58 PM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Ignacio,
>
> Yes, you are right that you need to define the watermark strategy
> explicitly in case of event time processing.
>
> Regarding to *with_timestamp_assigner*, this is optional. If you don’t
> define it, it will generate watermark according to the timestamp extracted
> from the Kafka record (ConsumerRecord). If you want to generate watermark
> according to some column from the data, you need to define it explicitly.
>
> Regards,
> Dian
>
> 2021年8月4日 上午2:10,Ignacio Taranto <ignacio.tara...@eclypsium.com> 写道:
>
> I assumed that the event time and watermarks were already handled by the
> Kafka connector.
>
> So, basically, I need to do something like:
>
> stream.assign_timestamps_and_watermarks(
>
> WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30))
> )
>
> Do I also need to set the timestamps myself by calling
> with_timestamp_assigner ?
> Wasn't the event timestamps already written by the Kafka source?
>
>
> On Tue, Aug 3, 2021 at 12:31 PM Nicolaus Weidner <
> nicolaus.weid...@ververica.com> wrote:
>
>> Hi Ignacio,
>>
>> I have no experience with the Python API, but just to make sure: Your
>> events do contain some timestamp, and you defined how to assign a timestamp
>> to an element and generate watermarks? I can't find this in the Python API
>> docs, so [1] is a link to the Java Datastream API docs.
>>
>> Best,
>> Nico
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/
>>
>> On Tue, Aug 3, 2021 at 2:34 PM Ignacio Taranto <
>> ignacio.tara...@eclypsium.com> wrote:
>>
>>> I'm trying to so a simple word count example,  here's the main function:
>>>
>>> def main():
>>>     parser = argparse.ArgumentParser()
>>>     parser.add_argument('--kafka-clients-jar', required=True)
>>>     parser.add_argument('--flink-connector-kafka-jar', required=True)
>>>     args = parser.parse_args()
>>>
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.add_jars(
>>>         f'file://{args.kafka_clients_jar}',
>>>         f'file://{args.flink_connector_kafka_jar}',
>>>     )
>>>
>>>     stream = env.add_source(
>>>         FlinkKafkaConsumer(
>>>             'flink_read_topic',
>>>             SimpleStringSchema(),
>>>             {'bootstrap.servers': 'f{broker_url}'},
>>>         ),
>>>     )
>>>
>>>     stream = stream \
>>>         .flat_map(lambda x: [word for word in x.split(' ')],
>>> Types.STRING()) \
>>>         .key_by(lambda x: x) \
>>>         .window(TumblingEventWindowAssigner(10000, 0, True)) \
>>>         .process(CountWordsFunction(), Types.TUPLE([Types.STRING(),
>>> Types.INT()])) \
>>>         .map(lambda x: f'word: \'{x[0]}\' count: \'{x[1]}\'',
>>> Types.STRING())
>>>
>>>     stream.add_sink(
>>>         FlinkKafkaProducer(
>>>             'flink_write_topic',
>>>             SimpleStringSchema(),
>>>             {'bootstrap.servers': 'f{broker_url}'},
>>>         ),
>>>     )
>>>
>>>     env.execute('word_count')
>>>
>>> And here's my window function:
>>>
>>> class CountWordsFunction(ProcessWindowFunction[str, Tuple[str, int],
>>> str, TimeWindow]):
>>>     def process(
>>>         self,
>>>         key: str,
>>>         content: ProcessWindowFunction.Context,
>>>         elements: Iterable[str],
>>>     ) -> Iterable[Tuple[str, int]]:
>>>         yield (key, sum(1 for elem in elements))
>>>
>>>     def clear(self, context: ProcessWindowFunction.Context) -> None:
>>>         pass
>>>
>>>
>>> I put some prints to debug and it seems that the assign_windows method
>>> is never called.
>>>
>>> On Mon, Aug 2, 2021 at 11:09 PM Dian Fu <dian0511...@gmail.com> wrote:
>>>
>>>> Regarding "Kafka consumer doesn’t read any message”: I’m wondering
>>>> about this. Usually the processing logic should not affect the Kafka
>>>> consumer. Did you judge this as there is no output for the job? If so, I’m
>>>> guessing that it’s because the window wasn’t triggered in case of
>>>> event-time.
>>>>
>>>> Could you share more code snippet?
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> 2021年8月2日 下午11:40,Ignacio Taranto <ignacio.tara...@eclypsium.com> 写道:
>>>>
>>>> I'm trying to use *FlinkKafkaConsumer* and a custom Trigger like
>>>> explained here:
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge
>>>>
>>>> This my *window assigner* implementation:
>>>>
>>>> class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]):
>>>>     def __init__(self, size: int, offset: int, is_event_time: bool):
>>>>         self._size = size
>>>>         self._offset = offset
>>>>         self._is_event_time = is_event_time
>>>>
>>>>     def assign_windows(
>>>>         self,
>>>>         element: str,
>>>>         timestamp: int,
>>>>         context: WindowAssigner.WindowAssignerContext,
>>>>     ) -> Collection[TimeWindow]:
>>>>         start = TimeWindow.get_window_start_with_offset(timestamp,
>>>> self._offset, self._size)
>>>>         return [TimeWindow(start, start + self._size)]
>>>>
>>>>     def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
>>>>         return EventTimeTrigger()
>>>>
>>>>     def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
>>>>         return TimeWindowSerializer()
>>>>
>>>>     def is_event_time(self) -> bool:
>>>>         return self._is_event_time
>>>>
>>>>
>>>> And this is my *trigger* implementation:
>>>>
>>>> class EventTimeTrigger(Trigger[str, TimeWindow]):
>>>>     def on_element(
>>>>         self,
>>>>         element: str,
>>>>         timestamp: int,
>>>>         window: TimeWindow,
>>>>         ctx: Trigger.TriggerContext,
>>>>     ) -> TriggerResult:
>>>>         return TriggerResult.CONTINUE
>>>>
>>>>     def on_processing_time(
>>>>         self,
>>>>         timestamp: int,
>>>>         window: TimeWindow,
>>>>         ctx: Trigger.TriggerContext,
>>>>     ) -> TriggerResult:
>>>>         return TriggerResult.CONTINUE
>>>>
>>>>     def on_event_time(
>>>>         self,
>>>>         timestamp: int,
>>>>         window: TimeWindow,
>>>>         ctx: Trigger.TriggerContext,
>>>>     ) -> TriggerResult:
>>>>         if timestamp >= window.max_timestamp():
>>>>             return TriggerResult.FIRE_AND_PURGE
>>>>         else:
>>>>             return TriggerResult.CONTINUE
>>>>
>>>>     def on_merge(
>>>>         self,
>>>>         window: TimeWindow,
>>>>         ctx: Trigger.OnMergeContext,
>>>>     ) -> None:
>>>>         pass
>>>>
>>>>     def clear(
>>>>         self,
>>>>         window: TimeWindow,
>>>>         ctx: Trigger.TriggerContext,
>>>>     ) -> None:
>>>>         pass
>>>>
>>>> But the problem is, the Kafka consumer does not read any message unless
>>>> I use process time instead and change the *on_processing_time*
>>>> implementation to be the same as *on_event_time*.
>>>> I'm I doing anything wrong here? How can I use event time properly?
>>>>
>>>> This e-mail and any attachments may contain information that is
>>>> privileged, confidential,  and/or exempt from disclosure under applicable
>>>> law.  If you are not the intended recipient, you are hereby notified
>>>> that any disclosure, copying, distribution or use of any information
>>>> contained herein is strictly prohibited. If you have received this
>>>> transmission in error, please immediately notify the sender and destroy the
>>>> original transmission and any attachments, whether in electronic or hard
>>>> copy format, without reading or saving.
>>>>
>>>>
>>>>
>>> This e-mail and any attachments may contain information that is
>>> privileged, confidential,  and/or exempt from disclosure under applicable
>>> law.  If you are not the intended recipient, you are hereby notified
>>> that any disclosure, copying, distribution or use of any information
>>> contained herein is strictly prohibited. If you have received this
>>> transmission in error, please immediately notify the sender and destroy the
>>> original transmission and any attachments, whether in electronic or hard
>>> copy format, without reading or saving.
>>>
>>>
> This e-mail and any attachments may contain information that is
> privileged, confidential,  and/or exempt from disclosure under applicable
> law.  If you are not the intended recipient, you are hereby notified
> that any disclosure, copying, distribution or use of any information
> contained herein is strictly prohibited. If you have received this
> transmission in error, please immediately notify the sender and destroy the
> original transmission and any attachments, whether in electronic or hard
> copy format, without reading or saving.
>
>
>

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.












Reply via email to