Hi mates !
I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.
When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less than the least required
Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
using the configuration key 'taskmanager.memory.task.off-heap.size*
Of course I've added required property into *flink-conf.yaml *and checked
that *pyflink-shell.sh *initializes env using specified configuration but
it doesn't make any sense and I still have an error.
I've also attached my flink-conf.yaml file
Thx for your help !
*Here is an example:*
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i
if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)
my_table = bt_env.from_elements(
[
("user-1", "http://url/1"),
("user-2", "http://url/2"),
("user-1", "http://url/3"),
("user-3", "http://url/4"),
("user-1", "http://url/3")
],
[
"uid", "url"
]
)
my_table_grouped_by_uid = my_table.group_by("uid").select("uid,
collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
bt_env.execute_sql("select test_udf(uid) as uid, urls from
my_temp_table").print()