Hi Harshit,

I guess you are using the reference code of the master instead of Flink
1.14 which is the version you are using.

TumblingEventTimeWindows is introduced in Flink 1.16 which is still not
released. However, it could be seen as an utility class and so I think you
could just copy it into your own project if you need to use it with Flink
1.14. You can refer to the PR [1] or just the code in master [2] for the
implementation of TumblingEventTimeWindows.

Regards,
Dian

[1]
https://github.com/apache/flink/pull/18957/files#diff-b20bd7951f36c43ecab0f8b38e366c9f9fe371f3e0ca7011c5172db1cfe87061
[2]
https://github.com/apache/flink/blob/6253cb143da0adb13581ed5bd1d3edce483eb8b3/flink-python/pyflink/datastream/window.py#L1129



On Tue, Apr 19, 2022 at 9:22 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink getting started pages.
>
>
>
> I am getting following error .
>
> ImportError: cannot import name 'TumblingEventTimeWindows' from
> 'pyflink.datastream.window'
> (C:\Users\Admin\PycharmProjects\pythonProject8\venv\lib\site-packages\pyflink\datastream\window.py)
>
>
>
>
>
> Below is my code for reference..
>
>
>
> import sys
>
>
>
> import argparse
>
> from typing import Iterable
>
>
>
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> RollingPolicy
>
>
>
> from pyflink.common import Types, WatermarkStrategy, Time, Encoder
>
> from pyflink.common.watermark_strategy import TimestampAssigner
>
> from pyflink.datastream import StreamExecutionEnvironment,
> ProcessWindowFunction
>
> from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow
>
>
>
>
>
> class MyTimestampAssigner(TimestampAssigner):
>
>     def extract_timestamp(self, value, record_timestamp) -> int:
>
>         return int(value[1])
>
>
>
>
>
> class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
> TimeWindow]):
>
>     def process(self,
>
>                 key: str,
>
>                 context: ProcessWindowFunction.Context[TimeWindow],
>
>                 elements: Iterable[tuple]) -> Iterable[tuple]:
>
>         return [(key, context.window().start, context.window().end, len([e
> for e in elements]))]
>
>
>
>     def clear(self, context: ProcessWindowFunction.Context) -> None:
>
>         pass
>
>
>
>
>
> if __name__ == '__main__':
>
>     parser = argparse.ArgumentParser()
>
>     parser.add_argument(
>
>         '--output',
>
>         dest='output',
>
>         required=False,
>
>         help='Output file to write results to.')
>
>
>
>     argv = sys.argv[1:]
>
>     known_args, _ = parser.parse_known_args(argv)
>
>     output_path = known_args.output
>
>
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     # write all the data to one file
>
>     env.set_parallelism(1)
>
>
>
>     # define the source
>
>     data_stream = env.from_collection([
>
>         ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8),
> ('hi', 9), ('hi', 15)],
>
>         type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
>
>
>
>     # define the watermark strategy
>
>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>
>         .with_timestamp_assigner(MyTimestampAssigner())
>
>
>
>     ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>
>         .key_by(lambda x: x[0], key_type=Types.STRING()) \
>
>         .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
>
>         .process(CountWindowProcessFunction(),
>
>                  Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
> Types.INT()]))
>
>
>
>     # define the sink
>
>     if output_path is not None:
>
>         ds.sink_to(
>
>             sink=FileSink.for_row_format(
>
>                 base_path=output_path,
>
>                 encoder=Encoder.simple_string_encoder())
>
>             .with_output_file_config(
>
>                 OutputFileConfig.builder()
>
>                 .with_part_prefix("prefix")
>
>                 .with_part_suffix(".ext")
>
>                 .build())
>
>             .with_rolling_policy(RollingPolicy.default_rolling_policy())
>
>             .build()
>
>         )
>
>     else:
>
>         print("Printing result to stdout. Use --output to specify output
> path.")
>
>         ds.print()
>
>
>
>     # submit for execution
>
>     env.execute()
>
>
>
> Thanks and Regards,
>
> Harshit
>
>
>
>
>
>
>

Reply via email to