Re: Re: flink1.14 注册mysql connector报错

2022-02-24 Thread xiaoyue
好的,成功入库,非常感谢您!



xiao...@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:57
收件人: user-zh
主题: Re: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。
 
best regards,
 
xiaoyue  於 2022年2月25日 週五 下午2:36寫道:
 
> Hi tony,
>完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
> # 执行环境
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> # hive源
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> String confSite = "src\\main\\resources";
>
> String version = "3.1.2";
>
> String defaultDatabase = "fund_analysis";
>
> HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
> tEnv.registerCatalog("hive", hiveCat);
>
> tEnv.useCatalog("hive");
> # hive 取数SQL
> String biz_date = "20211130";
> String tblSource = String.format("select " +
> "coalesce(a.rate,0) as yldrate, " +
> "coalesce(c.rate,0) as riskless_yldrate, " +
> "a.ccy_type, " +
> "a.biz_date, " +
> "b.is_exch_dt, " +
> "a.pf_id " +
> "from " +
> "ts_pf_yldrate a " +
> "inner join td_gl_day b on b.dt = a.biz_date " +
> "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
> "where a.biz_date <= '%s'", biz_date);
> Table table = tEnv.sqlQuery(tblSource);
>
> // 注册flatmap函数
> tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
> // 注册聚合函数
> tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
> // 执行flatmap操作
> Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
> $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
> $("is_exch_dt"),$("pf_id"), biz_date));
>
>  // 切换catalog,并注册表
> tEnv.useCatalog("default_catalog");
> tEnv.createTemporaryView("tagTable",tagTbl);
>
> // 调用函数SharpeRatioAgg 计算结果
>  Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
> // result.execute().print(); (--> 该步 result 可成功打印)
>
> // 下沉操作
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string" +
> ") with (" +
> "   'connector' = 'jdbc', " +
>     "   'username' = 'root', " +
> "   'password' = 'xxx', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
> tEnv.execute("mysql_sink_test");
>
>
> xiao...@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo

Re: Re: flink1.14 注册mysql connector报错

2022-02-24 Thread xiaoyue
Hi tony,
   完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf 
function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。

代码:
# 执行环境
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = StreamTableEnvironment.create(env, Settings);

# hive源
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

String confSite = "src\\main\\resources";

String version = "3.1.2";

String defaultDatabase = "fund_analysis";

HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, 
confSite, confSite, version);

tEnv.registerCatalog("hive", hiveCat);

tEnv.useCatalog("hive");
# hive 取数SQL
String biz_date = "20211130";
String tblSource = String.format("select " +
"coalesce(a.rate,0) as yldrate, " +
"coalesce(c.rate,0) as riskless_yldrate, " +
"a.ccy_type, " +
"a.biz_date, " +
"b.is_exch_dt, " +
"a.pf_id " +
"from " +
"ts_pf_yldrate a " +
"inner join td_gl_day b on b.dt = a.biz_date " +
"inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and 
c.pf_id = a.pf_id " +
"where a.biz_date <= '%s'", biz_date);
Table table = tEnv.sqlQuery(tblSource);

// 注册flatmap函数
tEnv.createTemporarySystemFunction("RowFlatMap", 
SharpeRatioFlatMap.class);
// 注册聚合函数
tEnv.createTemporarySystemFunction("SharpeRatioAgg", 
SharpeRatioAggregate.class);

// 执行flatmap操作
Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
$("riskless_yldrate"),$("ccy_type"),$("biz_date"),
$("is_exch_dt"),$("pf_id"), biz_date));
   
 // 切换catalog,并注册表
tEnv.useCatalog("default_catalog");
tEnv.createTemporaryView("tagTable",tagTbl);

// 调用函数SharpeRatioAgg 计算结果
 Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, 
dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as 
index_value from tagTable group by dmo_index_code", biz_date));
// result.execute().print(); (--> 该步 result 可成功打印)

// 下沉操作
String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string" +
") with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'xxx', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink);


result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
tEnv.execute("mysql_sink_test");


xiao...@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:13
收件人: user-zh
主题: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
 
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);
 
String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string, " +
"  primary key(dmo_index_code) not enforced) " +
"  with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'yss300377@ZT', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink).print();
//tEnv.execute("mysql_sink_test");
}
 
輸出的結果為:
++
| result |

flink1.14 注册mysql connector报错

2022-02-24 Thread xiaoyue
flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!

代码:
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = StreamTableEnvironment.create(env, Settings);

   String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string, " +
"  primary key(dmo_index_code) not enforced) " +
"  with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'yss300377@ZT', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' = 
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";   
 tEnv.executeSql(mysqlSink);
 tEnv.execute("mysql_sink_test");

报错:
org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "not" at line 1, column 126.
Was expecting one of:
"DISABLE" ...
"ENABLE" ...
"NORELY" ...
"NOVALIDATE" ...
"RELY" ...
"VALIDATE" ...
")" ...
"," ...


at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at 
com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "not" 
at line 1, column 126.



xiao...@ysstech.com


flink1.12.0 python udf任务,集群可正常执行,本地执行报错:java.lang.RuntimeException: Failed to create stage bundle factory!

2021-03-31 Thread xiaoyue
使用python flink1.12 写了UDAF的处理函数,local执行的时候会报错:
已确定当前py3环境下安装了apache-flink1.12.0
希望路过的大佬,能帮忙分析一下~ 感谢!
Traceback (most recent call last):
  File "C:/projects/virtual_pyflink1.12/TestScript/udaf_timeWeightedReturn.py", 
line 199, in udaf_p_case
env.execute('UDAF_timeWeightReturn_p')
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table_environment.py",
 line 1276, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
return f(*a, **kw)
  File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at 

pyflink1.12 报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0

2021-03-30 Thread xiaoyue
在执行 pyflink UDAF 
脚本时报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0。 
目前udaf计算的结果,无法sink, 不知路过的大佬,是否也遇到过这个问题?
异常信息如下:
Traceback (most recent call last):
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 114, in 
csv_source_udaf(csv_source)
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 45, in wrapper
func(*args, **kw)
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 103, in csv_source_udaf
print(result.to_pandas())
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table.py", 
line 808, in to_pandas
if batches.hasNext():
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
return f(*a, **kw)
  File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o101.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
at 
org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
at 
org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
... 16 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
... 18 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 

Re:Re: flink1.12 Standalone模式发送python脚本任务报错: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat

2021-03-22 Thread xiaoyue
好的,问题已经解决~ 谢谢您!










在 2021-03-22 16:50:03,"Dian Fu"  写道:
>可以看一下:
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program
>
>flink-connector-jdbc_2.11-1.12.0.jar和mysql-connector-java-8.0.12.jar,需要放到PyFlink可以找到的地方。
>
>On Mon, Mar 22, 2021 at 1:43 PM xiaoyue <18242988...@163.com> wrote:
>
>> flink1.12.2 部署standalone集群模式,任务是pyflink实现链接Mysql数据库完成计算任务:
>>
>> 1. 已在 /user/local/flink-1.12.2/lib目录下,添加相关依赖:
>>
>> mysql-connector-java-8.0.12.jar,
>>
>> flink-connector-jdbc_2.11-1.12.0.jar,
>>
>> flink-table-api-java-1.12.0.jar
>>
>> 2.发送任务命令:
>>
>>bin/flink run  -py ../test.py -p 8
>>
>> 3.附报错信息如下;在线等路过部署过的大佬,指点一下~ 谢谢!
>>
>> Traceback (most recent call last):
>>
>>   File "../test.py", line 57, in 
>>
>> env.execute('Test')
>>
>>   File
>> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>> line 1276, in execute
>>
>>   File
>> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>> line 1286, in __call__
>>
>>   File
>> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>> line 147, in deco
>>
>>   File
>> "/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.
>>
>> : org.apache.flink.util.FlinkException: Failed to execute job
>> 'Pyflink1.12_Query_Time_Test'.
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
>>
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
>>
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>>
>> at
>> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1277)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobInitializationException: Could not
>> instantiate JobManager.
>>
>> at
>> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>>
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>
>> at
>> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
>>
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>>
>> at
>> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>>
>>

flink1.12 Standalone模式发送python脚本任务报错: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat

2021-03-21 Thread xiaoyue
flink1.12.2 部署standalone集群模式,任务是pyflink实现链接Mysql数据库完成计算任务:

1. 已在 /user/local/flink-1.12.2/lib目录下,添加相关依赖:

mysql-connector-java-8.0.12.jar,

flink-connector-jdbc_2.11-1.12.0.jar,

flink-table-api-java-1.12.0.jar

2.发送任务命令:

   bin/flink run  -py ../test.py -p 8

3.附报错信息如下;在线等路过部署过的大佬,指点一下~ 谢谢!

Traceback (most recent call last):

  File "../test.py", line 57, in 

env.execute('Test')

  File 
"/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
 line 1276, in execute

  File 
"/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__

  File 
"/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco

  File 
"/usr/local/env/flink1.12_py3_env/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.

: org.apache.flink.util.FlinkException: Failed to execute job 
'Pyflink1.12_Query_Time_Test'.

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)

at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)

at 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1277)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not 
instantiate JobManager.

at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)

at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)

at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not instantiate JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'Source: TableSourceScan(table=[[default_catalog, 
default_database, TP_GL_DAY, project=[DAY_ID]]], fields=[DAY_ID])': Loading the 
input/output formats failed: 

at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:239)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)

at 

Re:Re: pyflink使用的一些疑问

2021-03-16 Thread xiaoyue
Hi, Xingbo
想跟您了解一下关于sql_query执行上的细节,flink1.12版本底层执行sql语句的过程中,是否有谓词下退的优化?
从相关的代码测试结果看:
1. pyflink1.11版本的connector定义支持参数read.query来获取数据,执行效率很高,猜测这部分执行交由数据库完成;
2. pyflink1.12版本取消了read.query参数,当定义多个数据源执行join等操作时,耗时很明显(pyflink)
所以,基于上述这种情况,想跟您请教一下这部分耗时,也是因为python的语言缺陷,或者ipc开销?还是底层的实现设计导致的呢?
感谢~
在 2021-03-16 14:27:22,"Xingbo Huang"  写道:
>Hi,
>
>补充回答两点
>1. 现在Table上是支持sliding window和Tumpling Window的Pandas UDAF[1]的,
>在1.13会支持session
>window的UDAF的支持。对于datastream上window的支持,对于上述几种window,你可以转到table上去操作,对于自定义window,datastream会在1.13支持。
>
>2. 关于性能问题,如果你不使用Python
>UDFs的话,本质就是跑的Java的代码,python起的作用只是在客户端编译JobGraph的作用,所以不存在说Python
>sql_update的运行性能比Java的慢,因为实际运行的代码是一模一样的。对于你使用了Python UDF的话,由于相比Java UDF,
>多了IPC的通信开销,以及Python本身的性能就不如Java
>Code,目前性能差别大概在6到7倍,我们也一直在性能上做努力,未来希望做到的是完全赶上Java code,甚至C code的性能。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
>
>Best,
>Xingbo
>
>xiaoyue  于2021年3月16日周二 上午11:42写道:
>
>> 您好,
>> 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
>> pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
>> 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
>> 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
>> 个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。
>> 以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~
>> 还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~
>>
>>
>>
>>
>> xiao...@ysstech.com
>>
>> 发件人: qian he
>> 发送时间: 2021-03-14 18:59
>> 收件人: user-zh-flink
>> 主题: pyflink使用的一些疑问
>> 你好,
>>
>>
>> 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
>> reduce函数,所以有以下疑问:
>> 1.Python flink的SDK还没支持dataset吗?
>> 2.是不是有其他替代方法?
>> 3.如果还没支持,有计划支持的时间吗?
>> 4.flink table为啥不支持map reduce操作?
>> 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
>> reduce操作,对应pandas项目改造成flink,有什么好的建议么?
>> 6. datastream api为什么没有实现Windows方法?后面版本会支持吗?
>>
>> 非常感谢,十分看好flink,希望社区越做越大,辛苦了!
>>


Re: pyflink使用的一些疑问

2021-03-15 Thread xiaoyue
您好,
目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。
以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~ 
还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~




xiao...@ysstech.com
 
发件人: qian he
发送时间: 2021-03-14 18:59
收件人: user-zh-flink
主题: pyflink使用的一些疑问
你好,
 
最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
reduce函数,所以有以下疑问:
1.Python flink的SDK还没支持dataset吗?
2.是不是有其他替代方法?
3.如果还没支持,有计划支持的时间吗?
4.flink table为啥不支持map reduce操作?
5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
reduce操作,对应pandas项目改造成flink,有什么好的建议么?
6. datastream api为什么没有实现Windows方法?后面版本会支持吗?
 
非常感谢,十分看好flink,希望社区越做越大,辛苦了!


Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread xiaoyue
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。

所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!







在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>


Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread xiaoyue
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!

















在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>


Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread xiaoyue
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。



















在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>


flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-25 Thread xiaoyue
不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
 sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID = 
source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND '20170307'"
# 获取Query结果
query_table = env.sql_query(sql)
query_table.to_pandas()
相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
由于python只是封装了一下flink的接口,所以会是GIL的影响么?
蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !



Re:SqlValidatorException: No match found for function signature prod()

2021-02-21 Thread xiaoyue
捞一下自己,在线等大佬们的回复 _(:з」∠)_







在 2021-02-20 13:14:18,"xiaoyue" <18242988...@163.com> 写道:

我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
signature prod(),请求大佬帮忙看看_(:з」∠)_

以下是代码:
-
...
  stableEnv.createTemporarySystemFunction("prod", 
ProductAggregateFunction.class);
  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
yldrate from queryData group by pf_id");
...
-
@FunctionHint(
input = @DataTypeHint("Double"),
output = @DataTypeHint("Double")
)
public class ProductAggregateFunction extends AggregateFunction {


@Override
public Double getValue(Product acc) {
return acc.prod;
}
@Override
public Product createAccumulator() {
return new Product();
}
public void accumulate(Product acc, Double iValue) {
acc.prod *= iValue;
}
public void retract(Product acc, Double iValue) {
acc.prod /= iValue;
}
public void merge(Product acc, Iterable it) {
for (Product p : it) {
accumulate(acc, p.prod);
}
}
public void resetAccumulator(Product acc) {
acc.prod = 1D;
}
}





 

SqlValidatorException: No match found for function signature prod()

2021-02-19 Thread xiaoyue
我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
signature prod(),请求大佬帮忙看看_(:з」∠)_

以下是代码:
-
...
  stableEnv.createTemporarySystemFunction("prod", 
ProductAggregateFunction.class);
  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
yldrate from queryData group by pf_id");
...
-
@FunctionHint(
input = @DataTypeHint("Double"),
output = @DataTypeHint("Double")
)
public class ProductAggregateFunction extends AggregateFunction {


@Override
public Double getValue(Product acc) {
return acc.prod;
}
@Override
public Product createAccumulator() {
return new Product();
}
public void accumulate(Product acc, Double iValue) {
acc.prod *= iValue;
}
public void retract(Product acc, Double iValue) {
acc.prod /= iValue;
}
public void merge(Product acc, Iterable it) {
for (Product p : it) {
accumulate(acc, p.prod);
}
}
public void resetAccumulator(Product acc) {
acc.prod = 1D;
}
}