在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]

2023-08-10 文章 1
HI
我简化了我python代码
只要udft方法有外部方法,都会有递归问题, 比如 
agan_add_iig(),尽管我的agan_add_iig()实现很简单,flink难道不能外部import自定义方法吗??
def agan_add_iig():
return 2
@udtf(input_types=DataTypes.STRING(),
result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()])
def run(data_str):
try:
logger.info("input param is %s", data_str)
data = [{'name': data_str}]
start_time = time.time()

agan_add_iig()
end_time = time.time()

print("入参耗时:", end_time - start_time)
# extractEngine = init_info(data)
return 'success', 'success', 'success', 'success'

except Exception as e:
err = e
logger.error(e)
return str(err), '', '', ''

在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]

2023-08-10 文章 1
各位老师好:


背景是这样的[flink1.17.1],我在window机器,本地单机调用自定义的pythonUDF,下面是我python代码


err=None
@udtf(input_types=DataTypes.STRING(),
result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()])
def run(data_str):
try:
logger.info("input param is ",  data_str)
data = [{'name': data_str}]
extractEngine = init_info(data)
for row in extractEngine.extract():
return row.content.get('content'), list_tran_json(row.content.get('tok')), 
list_tran_json(
row.content.get('pos')), list_tran_json(row.content.get('dep'))
except Exception as e:
err = e
logger.error(e)
return str(err), '', '', ''
这段代码我是定义了一个切词能力,使用hanlp的,进来把语句进行切词和依存处理,并返回,
我能保证并验证这个方法没有任何问题,都能执行;
现在我通过@udtf包装成udf,通过java代码去调用
下面是java代码:
@Test
@SneakyThrows
public void testTableMiningFunc() {
registerTable();
String registerSql = "CREATE TEMPORARY FUNCTION mining AS 
'py_bian_func.mining_w.run' LANGUAGE PYTHON";
tableEnv.executeSql(
registerSql);
String sql = "SELECT * from t_a001 ,LATERAL TABLE(mining(name)) as 
alias(content, pos, top, des) ";
TableResult tableResult = tableEnv.executeSql(sql);
tableResult.print();
}
返回数据是
这个我一直没搞懂,为什么会显示递归问题;


PS: 
补充解析下,python代码之所为通过全局变量去接受异常,因为我之前发现我如果不进行异常捕获,flink程序会卡住,日志提示会显示:org.apache.beam.runners.fnexecution.logging.GrpcLoggingService
  - Logging client hanged up.然后卡住不动
我去跟踪了算子图:,发现数据发送到python的算子,但是没有输出,对应日志为:org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator
  - Closing streams for instruction 1 and outbound data 
{fn/read/input:0=Byte size: 239, Element count: 4} and timers {}.




现在我在怀疑是flink在@udtf 加载装饰器过程中可能出现了问题,导致我run方法没有进去执行;因为我在run定义了log,但是在控制台没看到任何日志输出;
由于官网的docs对于这块底层加载逻辑没有太多介绍,这边请教下各位老师,应该怎么处理


我用pydev-pycharm去debug pyflink代码,发现在java_gateway.py中run方法有正常跑,而且


能正常发送,但是没有执行到我自定义的udf中的run方法;


困扰2天,望各位老师指点!