发一下完整的日志文件? On Tue, Apr 19, 2022 at 10:53 AM 799590...@qq.com.INVALID <799590...@qq.com.invalid> wrote:
> 软件版本 > > flink-1.13.6 > python3.6.8 > 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install > apache-flink==1.13.6 > standalonesession方式部署的,一个JM 两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6 > > 问题: > > 1、调用python udf时会报如下错误 > Servlet.service() for servlet [dispatcherServlet] in context with path [] > threw exception [Request processing failed; nested exception is > org.apache.flink.table.api.ValidationException: SQL validation failed. > Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with > root cause > java.lang.RuntimeException: Python callback server start failed! > > 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗? > > python udf文件myReplace.py的内容 > > from pyflink.table.expressions import call > > class MyReplace(ScalarFunction): > def __init__(self): > self.factor = 12 > > def eval(self, s): > return s.replace('buy','sale') > > 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment(host,port); > env.setStateBackend(new HashMapStateBackend()); > env.enableCheckpointing(1000); > env.getCheckpointConfig().setCheckpointStorage(new > JobManagerCheckpointStorage()); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(60000); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); > env.getCheckpointConfig().setExternalizedCheckpointCleanup( > CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION > ); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env,bsSettings); > > tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true); > > tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); > if (!catalogName.equals(tableEnv.getCurrentCatalog())) { > HiveCatalog hiveCatalog = new HiveCatalog(catalogName, > defaultDatabase, hiveConfDir); > tableEnv.registerCatalog(catalogName, hiveCatalog); > } > tableEnv.useCatalog(catalogName); > tableEnv.useDatabase(defaultDatabase); > List<String> jars = new ArrayList<>(); > List<String> pys = new ArrayList<>(); > log.info("开始加载hdfs上的udf"); > String prefix = "/file/function/flink/"; > try { > EnvUtil.registerFactory(new FsUrlStreamHandlerFactory()); > for (String hdf : EnvUtil.listHdfs(prefix,configuration)) { > if (hdf.endsWith(".jar")) { > jars.add(hdf); > EnvUtil.loadJar(URLUtil.url(hdf)); > } else if (hdf.endsWith(".py")){ > pys.add(hdf); > } > } > > tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); > > tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,",")); > log.info("完成加载hdfs上的udf"); > }catch (Exception e){ > e.printStackTrace(); > } > > python文件存放在hdfs指定的路劲下面 > > 上传py文件后通过tableEnv.executeSql 执行了 > > CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS > 'myReplace.MyReplace' LANGUAGE PYTHON > > 先行感谢flink官方同学的辛苦付出 > > > > 799590...@qq.com >