退订
退订
退订
在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]
HI 我简化了我python代码 只要udft方法有外部方法,都会有递归问题, 比如 agan_add_iig(),尽管我的agan_add_iig()实现很简单,flink难道不能外部import自定义方法吗?? def agan_add_iig(): return 2 @udtf(input_types=DataTypes.STRING(), result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]) def run(data_str): try: logger.info("input param is %s", data_str) data = [{'name': data_str}] start_time = time.time() agan_add_iig() end_time = time.time() print("入参耗时:", end_time - start_time) # extractEngine = init_info(data) return 'success', 'success', 'success', 'success' except Exception as e: err = e logger.error(e) return str(err), '', '', ''
在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]
各位老师好: 背景是这样的[flink1.17.1],我在window机器,本地单机调用自定义的pythonUDF,下面是我python代码 err=None @udtf(input_types=DataTypes.STRING(), result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]) def run(data_str): try: logger.info("input param is ", data_str) data = [{'name': data_str}] extractEngine = init_info(data) for row in extractEngine.extract(): return row.content.get('content'), list_tran_json(row.content.get('tok')), list_tran_json( row.content.get('pos')), list_tran_json(row.content.get('dep')) except Exception as e: err = e logger.error(e) return str(err), '', '', '' 这段代码我是定义了一个切词能力,使用hanlp的,进来把语句进行切词和依存处理,并返回, 我能保证并验证这个方法没有任何问题,都能执行; 现在我通过@udtf包装成udf,通过java代码去调用 下面是java代码: @Test @SneakyThrows public void testTableMiningFunc() { registerTable(); String registerSql = "CREATE TEMPORARY FUNCTION mining AS 'py_bian_func.mining_w.run' LANGUAGE PYTHON"; tableEnv.executeSql( registerSql); String sql = "SELECT * from t_a001 ,LATERAL TABLE(mining(name)) as alias(content, pos, top, des) "; TableResult tableResult = tableEnv.executeSql(sql); tableResult.print(); } 返回数据是 这个我一直没搞懂,为什么会显示递归问题; PS: 补充解析下,python代码之所为通过全局变量去接受异常,因为我之前发现我如果不进行异常捕获,flink程序会卡住,日志提示会显示:org.apache.beam.runners.fnexecution.logging.GrpcLoggingService - Logging client hanged up.然后卡住不动 我去跟踪了算子图:,发现数据发送到python的算子,但是没有输出,对应日志为:org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator - Closing streams for instruction 1 and outbound data {fn/read/input:0=Byte size: 239, Element count: 4} and timers {}. 现在我在怀疑是flink在@udtf 加载装饰器过程中可能出现了问题,导致我run方法没有进去执行;因为我在run定义了log,但是在控制台没看到任何日志输出; 由于官网的docs对于这块底层加载逻辑没有太多介绍,这边请教下各位老师,应该怎么处理 我用pydev-pycharm去debug pyflink代码,发现在java_gateway.py中run方法有正常跑,而且 能正常发送,但是没有执行到我自定义的udf中的run方法; 困扰2天,望各位老师指点!
flink sql 传参数问题
Hello: 请教2个问题。 1、flink 使用sql-client.sh -f xx.sql 怎么传递参数修改sql里面的文件。比如MySQL,Kafka的连接地址。 2、flink sql消费Kafka 设置group-offset,group.id之前没提交过,会直接报错。怎么设置成没提交过从earliest消费等等。 感谢大家
回复:pyflink1.17 中文乱码
thanks, 设置成GBK是可以的 | | 赵兵杰 | 回复的原邮件 | 发件人 | Leo | | 日期 | 2023年06月08日 15:16 | | 收件人 | user-zh@flink.apache.org、Shammon FY | | 抄送至 | | | 主题 | Re: pyflink1.17 中文乱码 | Hi, 这个问题我在1.16版本测试了一下, Pycharm 和 Windows命令行都测试了,结论如下: 1) Windows命令行没有任何问题,因为默认的终端窗口的字符集编码就是GBK; 2) Pycharm运行时必须先修改设置, Project Encoding设成GBK (WIndows上默认就是GBK), 控制台的字符集编码依赖于project的字符集 编码设置 3) Linux下没有测试,估计也要修改Project Encoding,改成UTF-8。 Thanks, Leo 在 2023/6/8 13:50, Shammon FY 写道: Hi, 你是怎么运行的?是不是中文的文件编码格式不对? Best, Shammon FY On Thu, Jun 8, 2023 at 10:07 AM yidan zhao wrote: 可以描述再详细点 1 于2023年6月7日周三 19:55写道: 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
pyflink1.17 中文乱码
老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
????
---- ??: "user-zh"
1.10 SqlClient启动报错
Hi,all 我在linux上启动1.10的sql-client,却遇到了如下错误信息: 看了一下sql-client.sh脚本需要在FLINK_OPT_DIR路径中找到Sql-client.jar,我就手动export了一下FLINK_OPT_DIR,但还是报上面的错,与此同时,我启动1.9版本的sql-client则可以正常启动,感觉这个问题很诡异,麻烦请大家帮忙看看