来自chenxuying的邮件

2021-06-17 Thread chenxuying



Re:Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-27 Thread chenxuying

at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)\n

at 
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)\n\t

at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)\n\t

at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)\n\t

at java.util.Iterator.forEachRemaining(Iterator.java:115)\n\t

at 
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:194)\n\t

at 
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:164)\n\t

at 
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:122)\n\t

at 
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:50)\n\t

at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:167)\n\t

... 22 more\n

```



















At 2021-04-27 02:03:05, "Robert Metzger"  wrote:

Thanks a lot for your message. This could be a bug in Flink. It seems that the 
archival of the execution graph is failing because some classes are unloaded. 


What I observe from your stack traces is that some classes are loaded from 
flink-dist_2.11-1.11.2.jar, while other classes are loaded from 
template-common-jar-0.0.1. Maybe Flink is closing the usercode classloader, and 
this is causing the exception during the archival of the execution graph. Can 
you make sure that the core Flink classes are only in your classpath once (in 
flink-dist), and the template-common-jar-0.0.1 doesn't contain the runtime 
Flink classes? (for example by setting the Flink dependencies to provided when 
using the maven-shade-plugin).


For the issue while submitting the job, I can not provide you any further help, 
because you haven't posted the exception that occurred in the REST handler. 
Could you post this exception here as well?


Best wishes,
Robert






On Sun, Apr 25, 2021 at 2:44 PM chenxuying  wrote:


environment:

flinksql 1.12.2

k8s session mode

description:

I got follow error log when my kafka connector port was wrong 

>>>>>

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

>>>>>




I got follow error log when my kafka connector ip was wrong 

>>>>>

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.sca

The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-25 Thread chenxuying
environment:

flinksql 1.12.2

k8s session mode

description:

I got follow error log when my kafka connector port was wrong 

>

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

>




I got follow error log when my kafka connector ip was wrong 

>

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while fetching topic metadata

>




When the job was cancelled,there was follow error log:

>

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state 
CANCELLING to CANCELED.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping 
checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 1 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
 not discarded.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 2 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
 not discarded.

2021-04-25 08:53:41,116 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 3 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
 not discarded.

2021-04-25 08:53:41,137 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.

2021-04-25 08:53:41,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 

flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务

2021-04-25 Thread chenxuying
环境:

flinksql 1.12.2

k8s session模式

描述:

当kafka 端口错误,过一段时间会有如下报错:

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

当kafka ip错误,过一段时间会有如下报错:

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while fetching topic metadata







然后对任务执行停止取消操作,会得到如下错误

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state 
CANCELLING to CANCELED.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping 
checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 1 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
 not discarded.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 2 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
 not discarded.

2021-04-25 08:53:41,116 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 3 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
 not discarded.

2021-04-25 08:53:41,137 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.

2021-04-25 08:53:41,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 
v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).

2021-04-25 08:53:41,151 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending 
SlotPool.

2021-04-25 08:53:41,151 

How to config the flink to load libs in myself path

2021-04-19 Thread chenxuying
Hi all, I deployed the flink in K8S by session cluster [1]
the default plugin path is /opt/flink/plugins,


the default lib path is /opt/flink/lib,
the default usrlib path is /opt/flink/usrlib,
I wonder if it is possible for change the default path.
For example, I wish flink don't load libs from /opt/flink/lib , and my want it 
to load libs files from /data/flink/lib.  and I can't move  /data/flink/lib to 
/opt/flink/lib

So how to config the flink to load lib in myself path


[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions

请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-23 Thread chenxuying
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 
但是我发现好像是10 , 
同时我也设置了其他的属性,比如
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
是可行,所以我的设置应该没有什么问题


[1]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained



Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 Thread chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么

不太明白这两点,可以写个代码例子看看吗


在 2020-10-15 19:47:20,"amen...@163.com"  写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
> 
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>非常感谢您的回复!
> 
>对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
>因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
>jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
> 
>期待您的回复,谢谢~
> 
>best, 
>amenhub
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>我们用方法是通过反射设置env的配置,增加pipeline.classpaths
>具体代码如下
>public static void main(final String[] args) throws Exception {
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>String path = "https://...xxx.jar;;
>loadJar(new URL(path));
>Field configuration =
>StreamExecutionEnvironment.class.getDeclaredField("configuration");
>configuration.setAccessible(true);
>Configuration o = (Configuration)configuration.get(env);
>Field confData = Configuration.class.getDeclaredField("confData");
>confData.setAccessible(true);
>Map temp = (Map)confData.get(o);
>List jarList = new ArrayList<>();
>jarList.add(path);
>temp.put("pipeline.classpaths",jarList);
>tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>'flinksql.function.udf.CxyTestReturnSelf'");
>tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>" f_sequence INT,\n" +
>" f_random INT,\n" +
>" f_random_str STRING,\n" +
>" ts AS localtimestamp,\n" +
>" WATERMARK FOR ts AS ts\n" +
>") WITH (\n" +
>" 'connector' = 'datagen',\n" +
>" 'rows-per-second'='5',\n" +
>"\n" +
>" 'fields.f_sequence.kind'='sequence',\n" +
>" 'fields.f_sequence.start'='1',\n" +
>" 'fields.f_sequence.end'='1000',\n" +
>"\n" +
>" 'fields.f_random.min'='1',\n" +
>" 'fields.f_random.max'='1000',\n" +
>"\n" +
>" 'fields.f_random_str.length'='10'\n" +
>")");
>tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>"f_random_str STRING" +
>") WITH (\n" +
>"'connector' = 'print'\n" +
>")");
>tableEnvironment.executeSql(
>"insert into sinktable " +
>"select CxyTestReturnSelf(f_random_str) " +
>"from sourceTable");
>}
>//动态加载Jar
>public static void loadJar(URL jarUrl) {
>//从URLClassLoader类加载器中获取类的addURL方法
>Method method = null;
>try {
>method = URLClassLoader.class.getDeclaredMethod("addURL",
>URL.class);
>} catch (NoSuchMethodException | SecurityException e1) {
>e1.printStackTrace();
>}
>// 获取方法的访问权限
>boolean accessible = method.isAccessible();
>try {
>//修改访问权限为可写
>if (accessible == false) {
>method.setAccessible(true);
>}
>// 获取系统类加载器
>URLClassLoader classLoader = (URLClassLoader)
>ClassLoader.getSystemClassLoader();
>//jar路径加入到系统url路径里
>method.invoke(classLoader, jarUrl);
>} catch (Exception e) {
>e.printStackTrace();
>} finally {
>method.setAccessible(accessible);
>}
>}
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-13 Thread chenxuying
集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 都是0

flink1.11.2 在k8s上部署,如何启动history server

2020-10-10 Thread chenxuying
flink1.11.2 在k8s上部署,如何启动history server
之前1.10的yaml里面可以加命令,但是1.11的yaml是通过docker-entrypoint.sh
好像没发现这个入口脚本没有对应的history server参数



Re:flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 Thread chenxuying
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), 
[DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数, 
好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenxuying"  写道:
>版本:
>pyflink==1.0
>apache-flink==1.11.2
>代码如下:
>env = StreamExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env = StreamTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
>
>
>class SplitStr(TableFunction):
>def eval(self, data):
>for row in data:
>yield row[0], row[1]
>splitStr = udtf(
>SplitStr(),
>DataTypes.ARRAY(
>DataTypes.ROW(
>[
>DataTypes.FIELD("name", DataTypes.STRING()),
>DataTypes.FIELD("id", DataTypes.STRING())
>]
>)
>),
>DataTypes.ROW(
>[
>DataTypes.FIELD("name", DataTypes.STRING()),
>DataTypes.FIELD("id", DataTypes.STRING())
>]
>)
>)
>t_env.register_function("splitStr", splitStr)
>
>
>t_env.sql_update("""
>CREATE TABLE mySource (
>
>id varchar,
>data array> 
>) WITH ( 
>'connector' = 'kafka',
>'topic' = 'mytesttopic',
>'properties.bootstrap.servers' = '172.17.0.2:9092',
>'properties.group.id' = 'flink-test-cxy',
>'scan.startup.mode' = 'latest-offset',
>'format' = 'json' 
>) 
>""")
>t_env.sql_update("""
>CREATE TABLE mysqlsink (
>id varchar
>,name varchar
>,age  varchar
>) 
>with (
>'connector' = 'print'
>)
>""")
>t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
>,LATERAL TABLE(splitStr(data)) as T(name, age)")
>t_env.execute("test")
>
>
>最终报错
>TypeError: Invalid result_type: result_type should be DataType but contains 
>RowField(name, VARCHAR)
>报错的地方是
>File 
>"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
> line 264, in __init__
>
>
>def __init__(self, func, input_types, result_types, deterministic=None, 
>name=None):
>super(UserDefinedTableFunctionWrapper, self).__init__(
>func, input_types, deterministic, name)
>
>
>if not isinstance(result_types, collections.Iterable):
>result_types = [result_types]
>
>
>for result_type in result_types:
>if not isinstance(result_type, DataType):
>raise TypeError(
>"Invalid result_type: result_type should be DataType but contains {}".format(
>result_type))
>
>
>self._result_types = result_types
>self._judtf_placeholder = None
>
>
>断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
>
>
>另外的,假如我在
>上面在创建udtf的时候,如果这样写
>splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
>DataTypes.BIGINT()])
>却可以正常运行,但是显然类型跟我实际运行的不对应


flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 Thread chenxuying
版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


class SplitStr(TableFunction):
def eval(self, data):
for row in data:
yield row[0], row[1]
splitStr = udtf(
SplitStr(),
DataTypes.ARRAY(
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
),
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
)
t_env.register_function("splitStr", splitStr)


t_env.sql_update("""
CREATE TABLE mySource ( 
   
id varchar,
data array> 
) WITH ( 
'connector' = 'kafka',
'topic' = 'mytesttopic',
'properties.bootstrap.servers' = '172.17.0.2:9092',
'properties.group.id' = 'flink-test-cxy',
'scan.startup.mode' = 'latest-offset',
'format' = 'json' 
) 
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id varchar
,name varchar
,age  varchar
) 
with (
'connector' = 'print'
)
""")
t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
,LATERAL TABLE(splitStr(data)) as T(name, age)")
t_env.execute("test")


最终报错
TypeError: Invalid result_type: result_type should be DataType but contains 
RowField(name, VARCHAR)
报错的地方是
File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
 line 264, in __init__


def __init__(self, func, input_types, result_types, deterministic=None, 
name=None):
super(UserDefinedTableFunctionWrapper, self).__init__(
func, input_types, deterministic, name)


if not isinstance(result_types, collections.Iterable):
result_types = [result_types]


for result_type in result_types:
if not isinstance(result_type, DataType):
raise TypeError(
"Invalid result_type: result_type should be DataType but contains {}".format(
result_type))


self._result_types = result_types
self._judtf_placeholder = None


断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗


另外的,假如我在
上面在创建udtf的时候,如果这样写
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])
却可以正常运行,但是显然类型跟我实际运行的不对应

flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-09-28 Thread chenxuying
我在使用k8s部署的时候也是按照官网的方式[1],是正常使用的, 然后后面加了volume配置

{

  ...

  "spec": {

...

"template": {

  ...

  "spec": {

"volumes": [

  ...

  {

"name": "libs-volume",

"hostPath": {

  "path": "/data/volumes/flink/jobmanager/cxylib",

  "type": ""

}

  },

  ...

],

"containers": [

  {

...

"volumeMounts": [

  {

"name": "flink-config-volume",

"mountPath": "/opt/flink/conf"

  },

  ...

],

...

  }

],

...

  }

},

...

  },

  ...

}

然后启动jobmanager报错

Starting Job Manager

sed: couldn't open temporary file /opt/flink/conf/sedz0NYKX: Read-only file 
system

sed: couldn't open temporary file /opt/flink/conf/sede6R0BY: Read-only file 
system

/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied

/docker-entrypoint.sh: 91: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system

Starting standalonesession as a console application on host 
flink-jobmanager-66fb98869d-w7plb.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.

Sep 28, 2020 7:11:14 AM org.apache.hadoop.util.NativeCodeLoader 

WARNING: Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable




[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions

Re:Re: flink使用在docker环境中部署出现的两个问题

2020-09-28 Thread chenxuying
请问一下第一个问题您说的修改启动命令,有例子吗,或者什么文档可以参考




在 2020-09-28 12:50:25,"Yang Wang"  写道:
>第一个问题,因为默认情况下JM/TM进程的STDOUT已经输出到console,所以是没有办公通过webui查看STDOUT输出的
>可以通过docker logs来查看,当然你也可以修改启动命令,把STDOUT重定向到具体的文件
>
>第二个问题,在JobManager和TaskManager的docker-entrypoint.sh[1]里面会修改flink-conf.yaml的,
>所以你mount进去会被修改掉
>
>[1].
>https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh
>
>
>Best,
>Yang
>
>chenxuying  于2020年9月27日周日 下午7:56写道:
>
>> 根据官网[1]使用docker部署flink,session cluster模式
>> 环境win10+docker+flink1.11.2
>> cmd命令
>> docker run ^
>> -d^
>> --rm ^
>> --name=jobmanager ^
>> --hostname=jobmanager ^
>> --network flink-network ^
>> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
>> -p 28081:8081 ^
>> flink:1.11.2-scala_2.11 jobmanager
>> docker run ^
>> -d^
>> --rm ^
>> --name=taskmanager ^
>> --hostname=taskmanager ^
>> --network flink-network ^
>> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
>> flink:1.11.2-scala_2.11 taskmanager
>>
>>
>> 问题一:
>> 在webui查看任务输出stdout提示找不到输出文件
>> java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the
>> TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_265]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
>> Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not
>> exist on the TaskExecutor.
>> ... 5 more
>> 2020-09-27 09:04:33,370 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
>> [] - Unhandled exception.
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on
>> the TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_265]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
>>
>>
>>
>>
>> 问题二:
>> mount的src的配置文件要多份吗
>> 因为我把env替换成mount,如下
>> docker run ^
>> -d^
>> --rm ^
>> --name=jobmanager ^
>> --network flink-network ^
>> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
>> ^
>> -p 28081:8081 ^
>> flink:1.11.2-scala_2.11 jobmanager
>>
>>
>> docker run ^
>> -d^
>> --rm ^
>> --name=taskmanager ^
>> --network flink-network ^
>> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
>> ^
>> flink:1.11.2-scala_2.11 taskmanager
>>
>>
>> 结果发现webui上的可用Task Managers为0
>> 每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip
>>
>> 我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用
>> 想问下大佬们,是我哪一步出现问题了吗
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html


flink使用在docker环境中部署出现的两个问题

2020-09-27 Thread chenxuying
根据官网[1]使用docker部署flink,session cluster模式
环境win10+docker+flink1.11.2
cmd命令
docker run ^
-d^
--rm ^
--name=jobmanager ^
--hostname=jobmanager ^
--network flink-network ^
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
-p 28081:8081 ^
flink:1.11.2-scala_2.11 jobmanager
docker run ^
-d^
--rm ^
--name=taskmanager ^
--hostname=taskmanager ^
--network flink-network ^
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
flink:1.11.2-scala_2.11 taskmanager


问题一:
在webui查看任务输出stdout提示找不到输出文件
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
The file STDOUT does not exist on the TaskExecutor.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist 
on the TaskExecutor.
... 5 more
2020-09-27 09:04:33,370 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - Unhandled exception.
org.apache.flink.util.FlinkException: The file STDOUT does not exist on the 
TaskExecutor.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]




问题二:
mount的src的配置文件要多份吗
因为我把env替换成mount,如下
docker run ^
-d^
--rm ^
--name=jobmanager ^
--network flink-network ^
--mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf ^
-p 28081:8081 ^
flink:1.11.2-scala_2.11 jobmanager


docker run ^
-d^
--rm ^
--name=taskmanager ^
--network flink-network ^
--mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf ^
flink:1.11.2-scala_2.11 taskmanager


结果发现webui上的可用Task Managers为0
每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip
我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用
想问下大佬们,是我哪一步出现问题了吗


[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html

flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-23 Thread chenxuying
flinksql 版本是1.11.2 
source接收到字段是字符串类型的时间
CREATE TABLE sourceTable (
 `time` STRING
 ) WITH(
...
 );


sink如下
CREATE TABLE sinktable (
`time1` STRING,
`time` TIMESTAMP(3)
 ) WITH (
 'connector' = 'print'
 );


insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
 insert into sinktable select 
`time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable


报错说是format错误
Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T
at 
java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663)
at 
java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572)
at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)

Re:Re: 使用flinksql时 jdbc connector参数不起作用

2020-09-21 Thread chenxuying
好的, 明白




在 2020-09-17 20:29:09,"Jark Wu"  写道:
>>  sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库
>
>这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280
>
>Best,
>Jark
>
>On Thu, 17 Sep 2020 at 18:15, chenxuying  wrote:
>
>> 环境是flink1.11.2+idea
>> sql:
>> CREATE TABLE sourceTable (
>> platform STRING
>> ,game_id bigint
>> ) WITH (
>> ...
>> );
>> CREATE TABLE sinktable (
>> platform STRING
>> ,game_id bigint
>> ) WITH (
>> 'connector' = 'jdbc',
>> 'url' = '',
>> 'table-name' = '',
>> 'driver' = 'com.mysql.jdbc.Driver',
>> 'username' = '',
>> 'password' = '',
>> 'sink.buffer-flush.max-rows' = '2',
>> 'sink.buffer-flush.interval' = '30s'
>> );
>> insert into sinktable select platform,game_id from sourceTable;
>>
>>
>> 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval
>> 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行
>> 如果设置如下
>>sink.buffer-flush.max-rows = '0'
>>'sink.buffer-flush.interval' = '60s'
>> 导致每接收一条数据就插入数据库
>> 如果设置如下
>>sink.buffer-flush.max-rows = '10'
>>'sink.buffer-flush.interval' = '0'
>> 导致无法插入数据库
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
>>
>>


使用flinksql时 jdbc connector参数不起作用

2020-09-17 Thread chenxuying
环境是flink1.11.2+idea
sql:
CREATE TABLE sourceTable (
platform STRING
,game_id bigint
) WITH (
...
);
CREATE TABLE sinktable (
platform STRING
,game_id bigint
) WITH (
'connector' = 'jdbc',
'url' = '',
'table-name' = '',
'driver' = 'com.mysql.jdbc.Driver',
'username' = '',
'password' = '',
'sink.buffer-flush.max-rows' = '2',
'sink.buffer-flush.interval' = '30s'
);
insert into sinktable select platform,game_id from sourceTable;


官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval 这两个属性可以设置成 
'0' 来禁用他 , 不过我试了下是不行
如果设置如下
   sink.buffer-flush.max-rows = '0'
   'sink.buffer-flush.interval' = '60s'
导致每接收一条数据就插入数据库
如果设置如下
   sink.buffer-flush.max-rows = '10'
   'sink.buffer-flush.interval' = '0'
导致无法插入数据库


[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options



Re:Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread chenxuying
好的 , 原来是bug , 感谢回答


在 2020-08-12 21:32:40,"Benchao Li"  写道:
>看起来是一个已知bug[1],已经修复,但是还没有发布。
>
>[1] https://issues.apache.org/jira/browse/FLINK-18862
>
>chenxuying  于2020年8月12日周三 下午9:25写道:
>
>> 版本:
>> flinksql 1.11.0
>> 需求:
>> 需要实现多行聚合成一行功能
>> 代码如下:
>> environmentSettings =
>> EnvironmentSettings.new_instance().in_streaming_mode().build()
>> t_env = StreamTableEnvironment.create(environment_settings =
>> environmentSettings)
>> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
>> 'true')
>>
>>
>> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
>> a_table = t_env.from_pandas(a_df,
>> DataTypes.ROW([DataTypes.FIELD("id",
>> DataTypes.STRING()),
>>  DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_a",a_table)
>>
>>
>> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
>> table_b = t_env.from_pandas(b_df ,
>> DataTypes.ROW([DataTypes.FIELD("val",
>> DataTypes.STRING()),
>>  DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_b",table_b)
>>
>>
>> t_env.sql_update("""
>> CREATE TABLE mySink (
>>
>> b varchar ,
>> c varchar
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """)
>>
>>
>> t_env.sql_update("""
>> insert into mySink
>> select t1.id ,LISTAGG(t2.val , ',')
>> from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
>> group by t1.id
>> """)
>> t_env.execute("tutorial_job")
>>
>>
>> 报错:
>> Caused by: java.lang.ClassCastException:
>> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
>> org.apache.flink.table.data.StringData at
>> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
>> at
>> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
>> at org.apache.flink.table.data.RowData.get(RowData.java:273) at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
>> Source) at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>
>-- 
>
>Best,
>Benchao Li


​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread chenxuying
版本:
flinksql 1.11.0
需求:
需要实现多行聚合成一行功能
代码如下:
environmentSettings = 
EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings = 
environmentSettings)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
a_table = t_env.from_pandas(a_df,
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()),
 DataTypes.FIELD("uuid", 
DataTypes.STRING())]))
t_env.create_temporary_view("table_a",a_table)


b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
table_b = t_env.from_pandas(b_df ,
DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()),
 DataTypes.FIELD("uuid", 
DataTypes.STRING())]))
t_env.create_temporary_view("table_b",table_b)


t_env.sql_update("""
CREATE TABLE mySink (   
 
b varchar ,
c varchar 
) WITH ( 
'connector' = 'print'   
) 
""")


t_env.sql_update("""
insert into mySink 
select t1.id ,LISTAGG(t2.val , ',') 
from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
group by t1.id
""")
t_env.execute("tutorial_job")


报错:
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData at 
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) 
at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) 
at org.apache.flink.table.data.RowData.get(RowData.java:273) at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745)





Re:Re: 有界数据中batch和stream的区别

2020-08-04 Thread chenxuying
你好,请问下我修改后的语句是
insert into print_sink select game_id,count(id) from mysql_source group by 
game_id
然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
2> +I(12,1)
5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) 6> 
-U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)


然后如果我使用的是batchMode,他就报错了
org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor 
down.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
...
Caused by: java.util.concurrent.CompletionException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
...
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
Suppressed: org.apache.flink.util.FlinkException: Could not properly shut down 
the TaskManager services.
at 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
at...
... 21 more
Caused by: org.apache.flink.util.FlinkException: Could not close resource.
at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
at 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
... 37 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper
at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
...
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
[CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper]


不知道您是否知道原因


在 2020-08-04 12:11:32,"godfrey he"  写道:
>逻辑上批产生的结果是Table,流产生的结果是Changelog。
>你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>最简单的方式可以将query改为带group by的,再看结果的差异。
>更多关于Table和Changelog的概念可以参考 [1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
>
>chenxuying  于2020年8月4日周二 上午11:44写道:
>
>> hi :
>> flink table sql 1.11.0
>> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>>
>>
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //.inStreamingMode()
>> .inBatchMode()
>> .build();
>>
>>
>> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
>> 不知道大佬们有没有例子可以比较容易理解
>> 我的代码
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //.inStreamingMode()
>> .inBatchMode()
>> .build();
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(environmentSettings);
>> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'jdbc',  " +
>> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> " 'username' = 'root' , " +
>> " 'password' = 'root', " +
>> " 'table-name' = 'mysqlsink' , " +
>> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> " 'sink.buffer-flush.interval' = '2s', " +
>> " 'sink.buffer-flush.max-rows' = '300' " +
>> " )");
>> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'print'  " +
>> " )");
>> tableEnvironment.executeSql("insert into print_sink select id,game_id from
>> mysql_source");


有界数据中batch和stream的区别

2020-08-03 Thread chenxuying
hi :
flink table sql 1.11.0
在EnvironmentSettings中可以设置BatchMode或StreamingMode


EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();


如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,  
不知道大佬们有没有例子可以比较容易理解
我的代码
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
TableEnvironment tableEnvironment = 
TableEnvironment.create(environmentSettings);
tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"'connector' = 'jdbc',  " +
" 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
" 'username' = 'root' , " +
" 'password' = 'root', " +
" 'table-name' = 'mysqlsink' , " +
" 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
" 'sink.buffer-flush.interval' = '2s', " +
" 'sink.buffer-flush.max-rows' = '300' " +
" )");
tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"'connector' = 'print'  " +
" )");
tableEnvironment.executeSql("insert into print_sink select id,game_id from 
mysql_source");

Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 Thread chenxuying
谢谢, 明白了








在 2020-08-03 10:42:53,"Leonard Xu"  写道:
>如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG 
>都支持 upsert, 底层对应的sql语句是
>
>Database   Upsert Grammar
>MySQL  INSERT .. ON DUPLICATE KEY UPDATE ..
>PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
>
>MySQL connector 不支持 replace into, 用的是 on duplicate key update.
>
>祝好
>Leonard 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes
> 
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes>
>
>
>> 在 2020年8月3日,10:33,chenxuying  写道:
>> 
>> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
>> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
>> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update 
>> , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>>> Hi,
>>> 
>>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>>> 支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  
>>> INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>>> OVERWRITE到DB的场景吗?
>>> 
>>> Best
>>> Leonard
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>
>>> 
>>>> 在 2020年8月1日,19:20,chenxuying  写道:
>>>> 
>>>> Hello
>>>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>>>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>>>> SupportsOverwrite interface.
>>>> 是得自定义connector吗,实现DynamicTableSink?
>>>> 
>>>> 
>>>> 祝好
>>>> chenxuying
>>>> [1] 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>>> 
>


Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 Thread chenxuying
你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 
我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持

















在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>Hi,
>
>这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
>OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>OVERWRITE到DB的场景吗?
>
>Best
>Leonard
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> 
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>
>
>> 在 2020年8月1日,19:20,chenxuying  写道:
>> 
>> Hello
>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>> SupportsOverwrite interface.
>> 是得自定义connector吗,实现DynamicTableSink?
>> 
>> 
>> 祝好
>> chenxuying
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>


flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-01 Thread chenxuying
Hello
请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
SupportsOverwrite interface.
是得自定义connector吗,实现DynamicTableSink?


祝好
chenxuying
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax

Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi
ok,谢谢,懂了哈哈














在 2020-07-31 21:27:02,"Leonard Xu"  写道:
>Hello
>
>> 在 2020年7月31日,21:13,chenxuying  写道:
>> 
>> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
>
>简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
>是通过用户的query来决定写入的模式是upsert 还是 append ,  你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 
>如果已经在用1.11了,1.10的文档可以不用看的。
> 
>在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY 
>KEY,不会再有类似问题.1.11的文档参考[2]。
>
>祝好
>Leonard
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
> 
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> 
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>


Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
谢谢回答
使用新属性可以 成功修改记录 ,
但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做

















在 2020-07-31 16:46:41,"Leonard Xu"  写道:
>Hi, chenxuying
>
>看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
>,这是老的option,使用老的option参数还是需要根据query推导主键,
>需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
> 
>Best
>Leonard
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
> 
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options>
>
>> 在 2020年7月31日,16:12,chenxuying  写道:
>> 
>> hi
>> 我使用的flink 1.11.0版本
>> 代码如下
>> StreamExecutionEnvironment streamEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
>> tableEnvironment.executeSql(" " +
>> " CREATE TABLE mySource ( " +
>> "  a bigint, " +
>> "  b bigint " +
>> " ) WITH ( " +
>> "  'connector.type' = 'kafka', " +
>> "  'connector.version' = 'universal', " +
>> "  'connector.topic' = 'mytesttopic', " +
>> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
>> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
>> "  'connector.properties.group.id' = 'flink-test-cxy', " +
>> "  'connector.startup-mode' = 'latest-offset', " +
>> "  'format.type' = 'json' " +
>> " ) ");
>> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "  'connector.type' = 'jdbc',   " +
>> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' 
>> , " +
>> "  'connector.username' = 'root' , " +
>> "  'connector.password' = 'root',  " +
>> "  'connector.table' = 'mysqlsink' , " +
>> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> "  'connector.write.flush.interval' = '2s',  " +
>> "  'connector.write.flush.max-rows' = '300'  " +
>> " )");
>> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
>> (select a,cast(b as varchar) b from mySource)");
>> 
>> 
>> 问题一 : 上面的insert语句会出现如下错误
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
>> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(> A, VARCHAR(2147483647) B)>)'. Supported form(s): 
>> '$SCALAR_QUERY()'
>> 
>> 
>> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
>> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
>> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate 
>> entry '1' for key 'PRIMARY'
>> 
>> 
>> 
>


flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
"  a bigint, " +
"  b bigint " +
" ) WITH ( " +
"  'connector.type' = 'kafka', " +
"  'connector.version' = 'universal', " +
"  'connector.topic' = 'mytesttopic', " +
"  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
"  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
"  'connector.properties.group.id' = 'flink-test-cxy', " +
"  'connector.startup-mode' = 'latest-offset', " +
"  'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"  'connector.type' = 'jdbc',   " +
"  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " 
+
"  'connector.username' = 'root' , " +
"  'connector.password' = 'root',  " +
"  'connector.table' = 'mysqlsink' , " +
"  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
"  'connector.write.flush.interval' = '2s',  " +
"  'connector.write.flush.max-rows' = '300'  " +
" )");
tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
(select a,cast(b as varchar) b from mySource)");


问题一 : 上面的insert语句会出现如下错误
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 
'$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY()'. Supported form(s): '$SCALAR_QUERY()'


问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
'1' for key 'PRIMARY'





Re:Re: 官方pyflink 例子的执行问题

2020-07-20 Thread chenxuying
你好
明白了,感谢 , 我文档没看清楚哈

















在 2020-07-21 11:44:23,"Xingbo Huang"  写道:
>Hi,
>你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>True)就行,如果你用了的话,就需要配置off-heap
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>'80m')。你可以参考文档上的例子,以及对应的note说明[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
>
>chenxuying  于2020年7月21日周二 上午11:36写道:
>
>> 官方例子:
>> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> 按照例子写了程序,也安装了pyflink
>> |
>> python -m pip install apache-flink
>> |
>> 代码:
>> |
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.udf import udf
>>
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env)
>>
>>
>> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
>> DataTypes.BIGINT())
>>
>>
>> t_env.register_function("add", add)
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
>> \
>> .with_format(OldCsv()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .create_temporary_table('mySource')
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
>> \
>> .with_format(OldCsv()
>> .field('sum', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('sum', DataTypes.BIGINT())) \
>> .create_temporary_table('mySink')
>>
>>
>> t_env.from_path('mySource')\
>> .select("add(a, b)") \
>> .insert_into('mySink')
>>
>>
>> t_env.execute("tutorial_job")
>> |
>>
>> 执行:
>>
>> |
>> python test_pyflink.py
>> |
>>
>> 报错:
>>
>>
>> |
>> Traceback (most recent call last):
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
>> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
>> The Task Off-Heap Memory can be configured using the configuration key
>> 'taskmanager.memory.task.off-heap.size'.
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
>> at
>> org.apache.f

官方pyflink 例子的执行问题

2020-07-20 Thread chenxuying
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)


add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], 
DataTypes.BIGINT())


t_env.register_function("add", add)


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
 \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
 \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')


t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')


t_env.execute("tutorial_job")
|

执行:

|
python test_pyflink.py
|

报错:


|
Traceback (most recent call last):
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.TableException: The configured Task Off-Heap 
Memory 0 bytes is less than the least required Python worker Memory 79 mb. The 
Task Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at 

flink REST API是否支持-C参数

2020-06-24 Thread chenxuying
目前使用的是flink 1.10.0
背景: 
REST API有一个提交job的接口
接口 /jars/:jarid/run

参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath


如果使用命令行方式提交job
flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c 
cn.xuying.flink.table.sql.ParserSqlJob 
/usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
可以看到命令行方式支持-C提供另外的jar包,flink会加载到classpath
问题:
发现目前的restapi并没有提供想命令行一样的-C参数的功能 , 所以想知道这个功能将来是否会增加

Re:Re: Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-24 Thread chenxuying
你好,请问下,my_parse是个udf吧
然后有没有什么操作可以使用udtf解析出多个字段 , 这些字段直接就是source表的字段 , 然后选出时间字段定义 watermark ,
类似如下
CREATE TABLE sourceTable(
request_uri STRING,
(column_1,column_2,heart_time) as udtf_parse(request_uri)
)with(..);
哈哈,不知道有没有这样的语法











在 2020-06-24 12:24:46,"Jark Wu"  写道:
>你可以在 DDL 中直接用计算列去从 request_uri 里获得 heart_time 哈,然后在这个计算列上定义 watermark 即可。
>例如:
>
>CREATE TABLE sourceTable (
>  request_uri STRING,
>  heart_time AS my_parse(request_uri),
>  WATERMARK FOR heart_time AS heart_time - INTERVAL '1' SECOND
>) WITH ( ... );
>
>虽然这会导致重复解析两遍。
>
>
>Best,
>Jark
>
>On Wed, 24 Jun 2020 at 12:09, Weixubin <18925434...@163.com> wrote:
>
>>
>>
>>
>> 感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。
>>
>>
>> 我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>> 第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。
>>
>>
>> //如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
>> select
>> ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
>> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>>  from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·
>>
>>
>> //如果应用到source,则一开始并不知道heart_time 的值
>> CREATE TABLE sourceTable (
>>   request_uri STRING
>> ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
>> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>> ) WITH ( ... );
>>
>>
>> 只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
>> Thanks
>> Bin
>>
>> 在 2020-06-23 15:28:50,"Leonard Xu"  写道:
>> >Hi
>> >我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select
>> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`
>> 的这段sql是可以复用的,就和 VIEW的作用类似。
>> >
>> >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable ,
>> LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by,
>> 插入最终的结果表就能满足需求了吧。
>> >
>> >祝好,
>> >Leonard Xu
>> >
>> >
>> >> 在 2020年6月23日,15:21,Weixubin <18925434...@163.com> 写道:
>> >>
>> >>
>> >>
>> >>
>> >> Hi,
>> >> 关于这句 “把 ` select * from sourceTable , LATERAL
>> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和
>> group后再写入最终结果表就可以了”
>> >> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误?
>> 可否简单举个例子。
>> >> Thanks,
>> >> Bin
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-23 11:57:28,"Leonard Xu"  写道:
>> >>> Hi,
>> >>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master
>> 分支上的版本号为1.12-SNAPSHOT
>> >>> ,等1.11版本发布了就可以看到对应的文档。
>> >>>
>> >>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10
>> 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL
>> TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和
>> group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>> >>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>> >>>
>> >>>
>> >>> 祝好,
>> >>> Leonard Xu
>> >
>>


Re:Re: Re: flink启动任务的方式

2020-04-21 Thread chenxuying
您说的jarFiles是以什么样的方式提交任务
然后我试了一下plugin,好像并不可以,重启flink cluster也不行 , 也不知是不是我的方式不对
我的目录结构是
xxx/flink/plugins/
folder1/
udf.jar


另外说一下,如果我把udf.jar放到 
/flink/lib下,重启是可以的,不过这不是我想要的方式,不知道您是否理解,因为我想要的我随时可以写个udf.jar,随时可以用,不要重启flink 
cluster

在 2020-04-21 17:46:00,"Arnold Zai"  写道:
>jarFiles参数不是个参数列表么,多传几个。
>
>或把依赖提前部署到${FLINK_HOME}/plugins里
>
>chenxuying  于2020年4月21日周二 下午3:36写道:
>
>> 这个是可以 , 不过我们的需求不允许打FatJar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
>> >打个FatJar
>> >
>> >chenxuying  于2020年4月21日周二 下午2:47写道:
>> >
>> >> 请问下目前flink的启动方式有哪些
>> >> 1 通过命令行来执行
>> >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> >> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
>> >> 2通过自带的webui页面上传jar , submit jar
>> >> 3 通过代码 createRemoteEnvironment
>> >>
>> >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
>> >> 无法实现命令行那样提供其他的jar包
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: flink启动任务的方式

2020-04-21 Thread chenxuying
这个是可以 , 不过我们的需求不允许打FatJar

















在 2020-04-21 15:27:48,"Arnold Zai"  写道:
>打个FatJar
>
>chenxuying  于2020年4月21日周二 下午2:47写道:
>
>> 请问下目前flink的启动方式有哪些
>> 1 通过命令行来执行
>> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
>> 2通过自带的webui页面上传jar , submit jar
>> 3 通过代码 createRemoteEnvironment
>>
>> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
>> 无法实现命令行那样提供其他的jar包
>>
>>
>>
>>


flink启动任务的方式

2020-04-21 Thread chenxuying
请问下目前flink的启动方式有哪些
1 通过命令行来执行
flink run -C file:///usr/local/soft/flink/function-0.1.jar -c 
cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
2通过自带的webui页面上传jar , submit jar
3 通过代码 createRemoteEnvironment


目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api 
无法实现命令行那样提供其他的jar包