After digging into the flink-python code, It seems if
`PYFLINK_GATEWAY_DISABLED` is set to false in an environment variable, then
using Types.LIST(Types.ROW([...])) does not have any issue, once Java
Gateway is launched.

It was unexpected for Flink local run to set this flag to false explicitly.

This is a workaround for this issue:

 def open(self, runtime_context: RuntimeContext):
        state_ttl_config = (
            StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
            .disable_cleanup_in_background()
            .build()
        )
        import os
        os.environ["PYFLINK_GATEWAY_DISABLED"] = "0"

On Wed, Oct 4, 2023 at 1:48 PM Elkhan Dadashov <elkhan.dadas...@gmail.com>
wrote:

> Hi Flinkers,
>
> I'm trying to use MapState, where the value will be a list of <class
> 'pyflink.common.types.Row'> type elements.
>
> Wanted to check if anyone else faced the same issue while trying to use
> MapState in PyFlink with complex types.
>
> Here is the code:
>
> from pyflink.common import Time
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import (
>     KeyedCoProcessFunction,
>     KeySelector,
>     RuntimeContext,
> )
> from pyflink.datastream.state import (
>     MapStateDescriptor,
>     StateTtlConfig,
>     ValueStateDescriptor,
>     ListStateDescriptor
> )
> from pyflink.table import DataTypes, StreamTableEnvironment
>
>
> class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
>     def __init__(self):
>         self.my_map_state = None
>
>     def open(self, runtime_context: RuntimeContext):
>         state_ttl_config = (
>             StateTtlConfig.new_builder(Time.seconds(1))
>             .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
>             .disable_cleanup_in_background()
>             .build()
>         )
>
>         my_map_state_descriptor = MapStateDescriptor(
>             "my_map_state",
>             Types.SQL_TIMESTAMP(),
>             Types.LIST(Types.ROW([
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.SQL_TIMESTAMP(),
>                 Types.SQL_TIMESTAMP(),
>                 Types.SQL_TIMESTAMP(),
>                 Types.BIG_INT()
>             ]))
>         )
>         my_map_state_descriptor.enable_time_to_live(state_ttl_config)
>         self.my_map_state =
> runtime_context.get_map_state(my_map_state_descriptor)
>
> But while running this code, it fails with this exception at job startup
> (at runtime_context.get_map_state(my_map_state_descriptor)), even without
> trying to add anything to the state.
>
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in
> pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation
> .__init__
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
> File
> "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
> line 127, in open
> self.open_func()
> File
> "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
> line 296, in open_func
> process_function.open(runtime_context)
> File "/tmp/ipykernel_83481/1603226134.py", line 57, in open
> File
> "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
> line 125, in get_map_state
> map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
> File
> "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
> line 812, in from_type_info
> from_type_info(type_info._key_type_info),
> from_type_info(type_info._value_type_info))
> File
> "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
> line 809, in from_type_info
> return GenericArrayCoder(from_type_info(type_info.elem_type))
> File
> "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
> line 819, in from_type_info
> [f for f in type_info.get_field_names()])
> File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py",
> line 377, in get_field_names
> j_field_names = self.get_java_type_info().getFieldNames()
> File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py",
> line 391, in get_java_type_info
> j_types_array = get_gateway()\
> File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py",
> line 62, in get_gateway
> _gateway = launch_gateway()
> File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py",
> line 86, in launch_gateway
> raise Exception("It's launching the PythonGatewayServer during Python UDF
> execution "
> Exception: It's launching the PythonGatewayServer during Python UDF
> execution which is unexpected. It usually happens when the job codes are in
> the top level of the Python script file and are not enclosed in a `if name
> == 'main'` statement.
>
> If I switch from Tupes.ROW to Types.TUPLE() it works without any exception.
>
> This works:
>
> my_map_state_descriptor = MapStateDescriptor(
>             "my_map_state",
>             Types.SQL_TIMESTAMP(),
>             Types.LIST(Types.TUPLE([
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.STRING(),
>                 Types.SQL_TIMESTAMP(),
>                 Types.SQL_TIMESTAMP(),
>                 Types.SQL_TIMESTAMP(),
>                 Types.BIG_INT()
>             ]))
>         )
>
> Also created Jira-FLINK-33188
> <https://issues.apache.org/jira/browse/FLINK-33188>
>
> Thanks.
>

Reply via email to