抱歉,报错信息理解错误,问题已经解决,感谢大佬。






在 2021-02-03 10:16:38,"肖越" <18242988...@163.com> 写道:

pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*****/udtf_test.py", line 42, in <module>
    env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
FROM query_result')
  File 
"C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 543, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,
    
environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
        CREATE TABLE print_result (
         yldrate1 DOUBLE
        ) WITH (
         'connector' = 'print'
        )
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
udf_type="pandas")
def multi_production(yldrate):
    yldrate_1 = yldrate + 1
    return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM 
query_result')
query_result.print_schema()
env.execute('my_udf_job')





 

回复