发一下完整的日志文件?

On Tue, Apr 19, 2022 at 10:53 AM 799590...@qq.com.INVALID
<799590...@qq.com.invalid> wrote:

> 软件版本
>
> flink-1.13.6
> python3.6.8
> 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install
> apache-flink==1.13.6
> standalonesession方式部署的,一个JM  两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6
>
> 问题:
>
> 1、调用python udf时会报如下错误
> Servlet.service() for servlet [dispatcherServlet] in context with path []
> threw exception [Request processing failed; nested exception is
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with
> root cause
> java.lang.RuntimeException: Python callback server start failed!
>
> 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗?
>
> python udf文件myReplace.py的内容
>
> from pyflink.table.expressions import call
>
> class MyReplace(ScalarFunction):
>   def __init__(self):
>     self.factor = 12
>
>   def eval(self, s):
>     return s.replace('buy','sale')
>
> 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment(host,port);
> env.setStateBackend(new HashMapStateBackend());
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setCheckpointStorage(new
> JobManagerCheckpointStorage());
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
> env.getCheckpointConfig().setExternalizedCheckpointCleanup(
> CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
> );
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true);
>
> tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
> if (!catalogName.equals(tableEnv.getCurrentCatalog())) {
>     HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
> defaultDatabase, hiveConfDir);
>     tableEnv.registerCatalog(catalogName, hiveCatalog);
> }
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> List<String> jars = new ArrayList<>();
> List<String> pys = new ArrayList<>();
> log.info("开始加载hdfs上的udf");
> String prefix = "/file/function/flink/";
> try {
>     EnvUtil.registerFactory(new FsUrlStreamHandlerFactory());
>     for (String hdf : EnvUtil.listHdfs(prefix,configuration)) {
> if (hdf.endsWith(".jar")) {
>     jars.add(hdf);
>     EnvUtil.loadJar(URLUtil.url(hdf));
> } else if (hdf.endsWith(".py")){
>     pys.add(hdf);
> }
>     }
>
> tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);
>
> tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,","));
>     log.info("完成加载hdfs上的udf");
> }catch (Exception e){
>     e.printStackTrace();
> }
>
> python文件存放在hdfs指定的路劲下面
>
> 上传py文件后通过tableEnv.executeSql 执行了
>
> CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS
> 'myReplace.MyReplace' LANGUAGE PYTHON
>
> 先行感谢flink官方同学的辛苦付出
>
>
>
> 799590...@qq.com
>

回复