Dian Fu created FLINK-34985:
-------------------------------

             Summary: It doesn't support to access fields by name for map 
function in thread mode
                 Key: FLINK-34985
                 URL: https://issues.apache.org/jira/browse/FLINK-34985
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Dian Fu


Reported in slack channel: 
[https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589]

```
hi all, I seem to be running into an issue when switching to thread mode in 
PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you 
cannot access fields by their name anymore. In process mode it works fine. This 
bug can easily be reproduced using this minimal example, which is close to the 
PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
    return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
    map_function,
    result_type=DataTypes.ROW(
        [
            DataTypes.FIELD("a", DataTypes.BIGINT()),
            DataTypes.FIELD("b", DataTypes.BIGINT()),
        ]
    ),
)
table = (
    t_env.from_elements(
        [(2, 4), (0, 0)],
        schema=DataTypes.ROW(
            [
                DataTypes.FIELD("a", DataTypes.BIGINT()),
                DataTypes.FIELD("b", DataTypes.BIGINT()),
            ]
        ),
    )
    .map(func)
    .alias("a", "b")
    .execute()
    .print()
)```
 
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 
'AttributeError'>: 'tuple' object has no attribute 'a'
2024-03-28 16:32:10     at 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10     at 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10     at <string>.<lambda>(<string>:1)
2024-03-28 16:32:10     at 
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to