退订

2024-03-19 文章 1
退订

退订

2023-10-03 文章 1



在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]

2023-08-10 文章 1
HI
我简化了我python代码
只要udft方法有外部方法,都会有递归问题, 比如 
agan_add_iig(),尽管我的agan_add_iig()实现很简单,flink难道不能外部import自定义方法吗??
def agan_add_iig():
return 2
@udtf(input_types=DataTypes.STRING(),
result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()])
def run(data_str):
try:
logger.info("input param is %s", data_str)
data = [{'name': data_str}]
start_time = time.time()

agan_add_iig()
end_time = time.time()

print("入参耗时:", end_time - start_time)
# extractEngine = init_info(data)
return 'success', 'success', 'success', 'success'

except Exception as e:
err = e
logger.error(e)
return str(err), '', '', ''

在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]

2023-08-10 文章 1
各位老师好:


背景是这样的[flink1.17.1],我在window机器,本地单机调用自定义的pythonUDF,下面是我python代码


err=None
@udtf(input_types=DataTypes.STRING(),
result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()])
def run(data_str):
try:
logger.info("input param is ",  data_str)
data = [{'name': data_str}]
extractEngine = init_info(data)
for row in extractEngine.extract():
return row.content.get('content'), list_tran_json(row.content.get('tok')), 
list_tran_json(
row.content.get('pos')), list_tran_json(row.content.get('dep'))
except Exception as e:
err = e
logger.error(e)
return str(err), '', '', ''
这段代码我是定义了一个切词能力,使用hanlp的,进来把语句进行切词和依存处理,并返回,
我能保证并验证这个方法没有任何问题,都能执行;
现在我通过@udtf包装成udf,通过java代码去调用
下面是java代码:
@Test
@SneakyThrows
public void testTableMiningFunc() {
registerTable();
String registerSql = "CREATE TEMPORARY FUNCTION mining AS 
'py_bian_func.mining_w.run' LANGUAGE PYTHON";
tableEnv.executeSql(
registerSql);
String sql = "SELECT * from t_a001 ,LATERAL TABLE(mining(name)) as 
alias(content, pos, top, des) ";
TableResult tableResult = tableEnv.executeSql(sql);
tableResult.print();
}
返回数据是
这个我一直没搞懂,为什么会显示递归问题;


PS: 
补充解析下,python代码之所为通过全局变量去接受异常,因为我之前发现我如果不进行异常捕获,flink程序会卡住,日志提示会显示:org.apache.beam.runners.fnexecution.logging.GrpcLoggingService
  - Logging client hanged up.然后卡住不动
我去跟踪了算子图:,发现数据发送到python的算子,但是没有输出,对应日志为:org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator
  - Closing streams for instruction 1 and outbound data 
{fn/read/input:0=Byte size: 239, Element count: 4} and timers {}.




现在我在怀疑是flink在@udtf 加载装饰器过程中可能出现了问题,导致我run方法没有进去执行;因为我在run定义了log,但是在控制台没看到任何日志输出;
由于官网的docs对于这块底层加载逻辑没有太多介绍,这边请教下各位老师,应该怎么处理


我用pydev-pycharm去debug pyflink代码,发现在java_gateway.py中run方法有正常跑,而且


能正常发送,但是没有执行到我自定义的udf中的run方法;


困扰2天,望各位老师指点!







flink sql 传参数问题

2023-07-12 文章 1
Hello:
  请教2个问题。
   1、flink 使用sql-client.sh -f xx.sql 怎么传递参数修改sql里面的文件。比如MySQL,Kafka的连接地址。
   2、flink sql消费Kafka 
设置group-offset,group.id之前没提交过,会直接报错。怎么设置成没提交过从earliest消费等等。
 感谢大家








回复:pyflink1.17 中文乱码

2023-06-09 文章 1
thanks, 设置成GBK是可以的



| |
赵兵杰
|




 回复的原邮件 
| 发件人 | Leo |
| 日期 | 2023年06月08日 15:16 |
| 收件人 | user-zh@flink.apache.org、Shammon FY |
| 抄送至 | |
| 主题 | Re: pyflink1.17 中文乱码 |

Hi,




这个问题我在1.16版本测试了一下, Pycharm 和 Windows命令行都测试了,结论如下:

1) Windows命令行没有任何问题,因为默认的终端窗口的字符集编码就是GBK;




2) Pycharm运行时必须先修改设置, Project Encoding设成GBK

(WIndows上默认就是GBK), 控制台的字符集编码依赖于project的字符集

编码设置





3) Linux下没有测试,估计也要修改Project Encoding,改成UTF-8。




Thanks,

Leo




在 2023/6/8 13:50, Shammon FY 写道:

Hi,

你是怎么运行的?是不是中文的文件编码格式不对?

Best,
Shammon FY


On Thu, Jun 8, 2023 at 10:07 AM yidan zhao  wrote:


可以描述再详细点

1  于2023年6月7日周三 19:55写道:


老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码







pyflink1.17 中文乱码

2023-06-07 文章 1
老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码







????

2021-03-18 文章 ?0?1?0?4
----
??: 
   "user-zh"



1.10 SqlClient启动报错

2020-02-24 文章 1
Hi,all

我在linux上启动1.10的sql-client,却遇到了如下错误信息:




看了一下sql-client.sh脚本需要在FLINK_OPT_DIR路径中找到Sql-client.jar,我就手动export了一下FLINK_OPT_DIR,但还是报上面的错,与此同时,我启动1.9版本的sql-client则可以正常启动,感觉这个问题很诡异,麻烦请大家帮忙看看

求助帖:flink tpc-ds中加入blink的runtime filter问题

2020-01-16 文章 zhaoyunpython . d . 1