Re: pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 文章 Xingbo Huang
Hi,
报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀
Best,
Xingbo

肖越 <18242988...@163.com> 于2021年2月3日周三 上午10:24写道:

> pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
> 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
> 结果print报错:
> Traceback (most recent call last):
>   File "C:*/udtf_test.py", line 42, in 
> env.execute_sql('INSERT INTO print_result SELECT
> multi_production(YLDRATE) FROM query_result')
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
> line 543, in execute_sql
> return TableResult(self._j_tenv.executeSql(stmt))
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: "The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'."
>
>
> 【代码如下】:
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> # s_env.set_parallelism(8)
> env = StreamTableEnvironment.create(s_env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
> env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
> # 注册源表
> env.execute_sql(get_table_ddl('TP_GL_DAY'))
> env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
> # 注册输出表
> out_ddl = '''
> CREATE TABLE print_result (
>  yldrate1 DOUBLE
> ) WITH (
>  'connector' = 'print'
> )
> '''
> env.execute_sql(out_ddl)
> # 定义及执行SQL
> log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY
> JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
> view_table = env.sql_query(log_query)
> env.register_table('query_result', view_table)
>
>
> # 定义计算逻辑函数
> @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(),
> udf_type="pandas")
> def multi_production(yldrate):
> yldrate_1 = yldrate + 1
> return np.prod(yldrate_1) - 1
>
>
> # 注册函数
> env.register_function('multi_production', multi_production)
> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
> FROM query_result')
> query_result.print_schema()
> env.execute('my_udf_job')
>
>


pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 文章 肖越
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*/udtf_test.py", line 42, in 
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
FROM query_result')
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 543, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,

environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
CREATE TABLE print_result (
 yldrate1 DOUBLE
) WITH (
 'connector' = 'print'
)
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM 
query_result')
query_result.print_schema()
env.execute('my_udf_job')