来自chenxuying的邮件
Re:Re: The wrong Options of Kafka Connector, will make the cluster can not run any job
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)\n at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)\n\t at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)\n\t at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)\n\t at java.util.Iterator.forEachRemaining(Iterator.java:115)\n\t at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:194)\n\t at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:164)\n\t at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:122)\n\t at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:50)\n\t at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:167)\n\t ... 22 more\n ``` At 2021-04-27 02:03:05, "Robert Metzger" wrote: Thanks a lot for your message. This could be a bug in Flink. It seems that the archival of the execution graph is failing because some classes are unloaded. What I observe from your stack traces is that some classes are loaded from flink-dist_2.11-1.11.2.jar, while other classes are loaded from template-common-jar-0.0.1. Maybe Flink is closing the usercode classloader, and this is causing the exception during the archival of the execution graph. Can you make sure that the core Flink classes are only in your classpath once (in flink-dist), and the template-common-jar-0.0.1 doesn't contain the runtime Flink classes? (for example by setting the Flink dependencies to provided when using the maven-shade-plugin). For the issue while submitting the job, I can not provide you any further help, because you haven't posted the exception that occurred in the REST handler. Could you post this exception here as well? Best wishes, Robert On Sun, Apr 25, 2021 at 2:44 PM chenxuying wrote: environment: flinksql 1.12.2 k8s session mode description: I got follow error log when my kafka connector port was wrong >>>>> 2021-04-25 16:49:50 org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition filebeat_json_install_log-3 could be determined >>>>> I got follow error log when my kafka connector ip was wrong >>>>> 2021-04-25 20:12:53 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.sca
The wrong Options of Kafka Connector, will make the cluster can not run any job
environment: flinksql 1.12.2 k8s session mode description: I got follow error log when my kafka connector port was wrong > 2021-04-25 16:49:50 org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition filebeat_json_install_log-3 could be determined > I got follow error log when my kafka connector ip was wrong > 2021-04-25 20:12:53 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata > When the job was cancelled,there was follow error log: > 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state CANCELLING to CANCELED. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 1 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1' not discarded. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 2 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2' not discarded. 2021-04-25 08:53:41,116 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 3 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3' not discarded. 2021-04-25 08:53:41,137 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED. 2021-04-25 08:53:41,148 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job
flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务
环境: flinksql 1.12.2 k8s session模式 描述: 当kafka 端口错误,过一段时间会有如下报错: 2021-04-25 16:49:50 org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition filebeat_json_install_log-3 could be determined 当kafka ip错误,过一段时间会有如下报错: 2021-04-25 20:12:53 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 然后对任务执行停止取消操作,会得到如下错误 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state CANCELLING to CANCELED. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 1 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1' not discarded. 2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 2 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2' not discarded. 2021-04-25 08:53:41,116 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 3 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3' not discarded. 2021-04-25 08:53:41,137 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED. 2021-04-25 08:53:41,148 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf). 2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending SlotPool. 2021-04-25 08:53:41,151
How to config the flink to load libs in myself path
Hi all, I deployed the flink in K8S by session cluster [1] the default plugin path is /opt/flink/plugins, the default lib path is /opt/flink/lib, the default usrlib path is /opt/flink/usrlib, I wonder if it is possible for change the default path. For example, I wish flink don't load libs from /opt/flink/lib , and my want it to load libs files from /data/flink/lib. and I can't move /data/flink/lib to /opt/flink/lib So how to config the flink to load lib in myself path [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions
请问flink1.11版本如何设置checkpoint的默认保存个数
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 但是我发现好像是10 , 同时我也设置了其他的属性,比如 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION 是可行,所以我的设置应该没有什么问题 [1]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
Re:Re: Re: flink1.11加载外部jar包进行UDF注册
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常 2线程上下文类加载器是什么 不太明白这两点,可以写个代码例子看看吗 在 2020-10-15 19:47:20,"amen...@163.com" 写道: >追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? >那这种设置env的方式有可能还会造成其他什么问题? > >best, >amenhub > >发件人: amen...@163.com >发送时间: 2020-10-15 19:22 >收件人: user-zh >主题: Re: Re: flink1.11加载外部jar包进行UDF注册 >非常感谢您的回复! > >对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? >因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF >jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 > >期待您的回复,谢谢~ > >best, >amenhub >发件人: cxydeve...@163.com >发送时间: 2020-10-15 17:46 >收件人: user-zh >主题: Re: flink1.11加载外部jar包进行UDF注册 >我们用方法是通过反射设置env的配置,增加pipeline.classpaths >具体代码如下 >public static void main(final String[] args) throws Exception { >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >StreamTableEnvironment tableEnvironment = >StreamTableEnvironment.create(env, settings); >//String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; >String path = "https://...xxx.jar;; >loadJar(new URL(path)); >Field configuration = >StreamExecutionEnvironment.class.getDeclaredField("configuration"); >configuration.setAccessible(true); >Configuration o = (Configuration)configuration.get(env); >Field confData = Configuration.class.getDeclaredField("confData"); >confData.setAccessible(true); >Map temp = (Map)confData.get(o); >List jarList = new ArrayList<>(); >jarList.add(path); >temp.put("pipeline.classpaths",jarList); >tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS >'flinksql.function.udf.CxyTestReturnSelf'"); >tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + >" f_sequence INT,\n" + >" f_random INT,\n" + >" f_random_str STRING,\n" + >" ts AS localtimestamp,\n" + >" WATERMARK FOR ts AS ts\n" + >") WITH (\n" + >" 'connector' = 'datagen',\n" + >" 'rows-per-second'='5',\n" + >"\n" + >" 'fields.f_sequence.kind'='sequence',\n" + >" 'fields.f_sequence.start'='1',\n" + >" 'fields.f_sequence.end'='1000',\n" + >"\n" + >" 'fields.f_random.min'='1',\n" + >" 'fields.f_random.max'='1000',\n" + >"\n" + >" 'fields.f_random_str.length'='10'\n" + >")"); >tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + >"f_random_str STRING" + >") WITH (\n" + >"'connector' = 'print'\n" + >")"); >tableEnvironment.executeSql( >"insert into sinktable " + >"select CxyTestReturnSelf(f_random_str) " + >"from sourceTable"); >} >//动态加载Jar >public static void loadJar(URL jarUrl) { >//从URLClassLoader类加载器中获取类的addURL方法 >Method method = null; >try { >method = URLClassLoader.class.getDeclaredMethod("addURL", >URL.class); >} catch (NoSuchMethodException | SecurityException e1) { >e1.printStackTrace(); >} >// 获取方法的访问权限 >boolean accessible = method.isAccessible(); >try { >//修改访问权限为可写 >if (accessible == false) { >method.setAccessible(true); >} >// 获取系统类加载器 >URLClassLoader classLoader = (URLClassLoader) >ClassLoader.getSystemClassLoader(); >//jar路径加入到系统url路径里 >method.invoke(classLoader, jarUrl); >} catch (Exception e) { >e.printStackTrace(); >} finally { >method.setAccessible(accessible); >} >} >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情
集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出, 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 都是0
flink1.11.2 在k8s上部署,如何启动history server
flink1.11.2 在k8s上部署,如何启动history server 之前1.10的yaml里面可以加命令,但是1.11的yaml是通过docker-entrypoint.sh 好像没发现这个入口脚本没有对应的history server参数
Re:flinksql注册udtf使用ROW类型做为输出输出时出错
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数, 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪 在 2020-09-30 19:07:06,"chenxuying" 写道: >版本: >pyflink==1.0 >apache-flink==1.11.2 >代码如下: >env = StreamExecutionEnvironment.get_execution_environment() >env.set_parallelism(1) >t_env = StreamTableEnvironment.create(env) >t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", > 'true') > > >class SplitStr(TableFunction): >def eval(self, data): >for row in data: >yield row[0], row[1] >splitStr = udtf( >SplitStr(), >DataTypes.ARRAY( >DataTypes.ROW( >[ >DataTypes.FIELD("name", DataTypes.STRING()), >DataTypes.FIELD("id", DataTypes.STRING()) >] >) >), >DataTypes.ROW( >[ >DataTypes.FIELD("name", DataTypes.STRING()), >DataTypes.FIELD("id", DataTypes.STRING()) >] >) >) >t_env.register_function("splitStr", splitStr) > > >t_env.sql_update(""" >CREATE TABLE mySource ( > >id varchar, >data array> >) WITH ( >'connector' = 'kafka', >'topic' = 'mytesttopic', >'properties.bootstrap.servers' = '172.17.0.2:9092', >'properties.group.id' = 'flink-test-cxy', >'scan.startup.mode' = 'latest-offset', >'format' = 'json' >) >""") >t_env.sql_update(""" >CREATE TABLE mysqlsink ( >id varchar >,name varchar >,age varchar >) >with ( >'connector' = 'print' >) >""") >t_env.sql_update("insert into mysqlsink select id,name,age from mySource >,LATERAL TABLE(splitStr(data)) as T(name, age)") >t_env.execute("test") > > >最终报错 >TypeError: Invalid result_type: result_type should be DataType but contains >RowField(name, VARCHAR) >报错的地方是 >File >"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", > line 264, in __init__ > > >def __init__(self, func, input_types, result_types, deterministic=None, >name=None): >super(UserDefinedTableFunctionWrapper, self).__init__( >func, input_types, deterministic, name) > > >if not isinstance(result_types, collections.Iterable): >result_types = [result_types] > > >for result_type in result_types: >if not isinstance(result_type, DataType): >raise TypeError( >"Invalid result_type: result_type should be DataType but contains {}".format( >result_type)) > > >self._result_types = result_types >self._judtf_placeholder = None > > >断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 > > >另外的,假如我在 >上面在创建udtf的时候,如果这样写 >splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), >DataTypes.BIGINT()]) >却可以正常运行,但是显然类型跟我实际运行的不对应
flinksql注册udtf使用ROW类型做为输出输出时出错
版本: pyflink==1.0 apache-flink==1.11.2 代码如下: env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') class SplitStr(TableFunction): def eval(self, data): for row in data: yield row[0], row[1] splitStr = udtf( SplitStr(), DataTypes.ARRAY( DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.STRING()) ] ) ), DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.STRING()) ] ) ) t_env.register_function("splitStr", splitStr) t_env.sql_update(""" CREATE TABLE mySource ( id varchar, data array> ) WITH ( 'connector' = 'kafka', 'topic' = 'mytesttopic', 'properties.bootstrap.servers' = '172.17.0.2:9092', 'properties.group.id' = 'flink-test-cxy', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """) t_env.sql_update(""" CREATE TABLE mysqlsink ( id varchar ,name varchar ,age varchar ) with ( 'connector' = 'print' ) """) t_env.sql_update("insert into mysqlsink select id,name,age from mySource ,LATERAL TABLE(splitStr(data)) as T(name, age)") t_env.execute("test") 最终报错 TypeError: Invalid result_type: result_type should be DataType but contains RowField(name, VARCHAR) 报错的地方是 File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", line 264, in __init__ def __init__(self, func, input_types, result_types, deterministic=None, name=None): super(UserDefinedTableFunctionWrapper, self).__init__( func, input_types, deterministic, name) if not isinstance(result_types, collections.Iterable): result_types = [result_types] for result_type in result_types: if not isinstance(result_type, DataType): raise TypeError( "Invalid result_type: result_type should be DataType but contains {}".format( result_type)) self._result_types = result_types self._judtf_placeholder = None 断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 另外的,假如我在 上面在创建udtf的时候,如果这样写 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()]) 却可以正常运行,但是显然类型跟我实际运行的不对应
flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system
我在使用k8s部署的时候也是按照官网的方式[1],是正常使用的, 然后后面加了volume配置 { ... "spec": { ... "template": { ... "spec": { "volumes": [ ... { "name": "libs-volume", "hostPath": { "path": "/data/volumes/flink/jobmanager/cxylib", "type": "" } }, ... ], "containers": [ { ... "volumeMounts": [ { "name": "flink-config-volume", "mountPath": "/opt/flink/conf" }, ... ], ... } ], ... } }, ... }, ... } 然后启动jobmanager报错 Starting Job Manager sed: couldn't open temporary file /opt/flink/conf/sedz0NYKX: Read-only file system sed: couldn't open temporary file /opt/flink/conf/sede6R0BY: Read-only file system /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml: Permission denied /docker-entrypoint.sh: 91: /docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system Starting standalonesession as a console application on host flink-jobmanager-66fb98869d-w7plb. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Sep 28, 2020 7:11:14 AM org.apache.hadoop.util.NativeCodeLoader WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
Re:Re: flink使用在docker环境中部署出现的两个问题
请问一下第一个问题您说的修改启动命令,有例子吗,或者什么文档可以参考 在 2020-09-28 12:50:25,"Yang Wang" 写道: >第一个问题,因为默认情况下JM/TM进程的STDOUT已经输出到console,所以是没有办公通过webui查看STDOUT输出的 >可以通过docker logs来查看,当然你也可以修改启动命令,把STDOUT重定向到具体的文件 > >第二个问题,在JobManager和TaskManager的docker-entrypoint.sh[1]里面会修改flink-conf.yaml的, >所以你mount进去会被修改掉 > >[1]. >https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh > > >Best, >Yang > >chenxuying 于2020年9月27日周日 下午7:56写道: > >> 根据官网[1]使用docker部署flink,session cluster模式 >> 环境win10+docker+flink1.11.2 >> cmd命令 >> docker run ^ >> -d^ >> --rm ^ >> --name=jobmanager ^ >> --hostname=jobmanager ^ >> --network flink-network ^ >> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^ >> -p 28081:8081 ^ >> flink:1.11.2-scala_2.11 jobmanager >> docker run ^ >> -d^ >> --rm ^ >> --name=taskmanager ^ >> --hostname=taskmanager ^ >> --network flink-network ^ >> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^ >> flink:1.11.2-scala_2.11 taskmanager >> >> >> 问题一: >> 在webui查看任务输出stdout提示找不到输出文件 >> java.util.concurrent.CompletionException: >> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the >> TaskExecutor. >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) >> ~[?:1.8.0_265] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> ~[?:1.8.0_265] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> ~[?:1.8.0_265] >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] >> Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not >> exist on the TaskExecutor. >> ... 5 more >> 2020-09-27 09:04:33,370 ERROR >> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler >> [] - Unhandled exception. >> org.apache.flink.util.FlinkException: The file STDOUT does not exist on >> the TaskExecutor. >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742) >> ~[flink-dist_2.11-1.11.2.jar:1.11.2] >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) >> ~[?:1.8.0_265] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> ~[?:1.8.0_265] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> ~[?:1.8.0_265] >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] >> >> >> >> >> 问题二: >> mount的src的配置文件要多份吗 >> 因为我把env替换成mount,如下 >> docker run ^ >> -d^ >> --rm ^ >> --name=jobmanager ^ >> --network flink-network ^ >> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf >> ^ >> -p 28081:8081 ^ >> flink:1.11.2-scala_2.11 jobmanager >> >> >> docker run ^ >> -d^ >> --rm ^ >> --name=taskmanager ^ >> --network flink-network ^ >> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf >> ^ >> flink:1.11.2-scala_2.11 taskmanager >> >> >> 结果发现webui上的可用Task Managers为0 >> 每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip >> >> 我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用 >> 想问下大佬们,是我哪一步出现问题了吗 >> >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html
flink使用在docker环境中部署出现的两个问题
根据官网[1]使用docker部署flink,session cluster模式 环境win10+docker+flink1.11.2 cmd命令 docker run ^ -d^ --rm ^ --name=jobmanager ^ --hostname=jobmanager ^ --network flink-network ^ --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^ -p 28081:8081 ^ flink:1.11.2-scala_2.11 jobmanager docker run ^ -d^ --rm ^ --name=taskmanager ^ --hostname=taskmanager ^ --network flink-network ^ --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^ flink:1.11.2-scala_2.11 taskmanager 问题一: 在webui查看任务输出stdout提示找不到输出文件 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_265] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. ... 5 more 2020-09-27 09:04:33,370 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Unhandled exception. org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_265] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] 问题二: mount的src的配置文件要多份吗 因为我把env替换成mount,如下 docker run ^ -d^ --rm ^ --name=jobmanager ^ --network flink-network ^ --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf ^ -p 28081:8081 ^ flink:1.11.2-scala_2.11 jobmanager docker run ^ -d^ --rm ^ --name=taskmanager ^ --network flink-network ^ --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf ^ flink:1.11.2-scala_2.11 taskmanager 结果发现webui上的可用Task Managers为0 每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip 我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用 想问下大佬们,是我哪一步出现问题了吗 [1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html
flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP
flinksql 版本是1.11.2 source接收到字段是字符串类型的时间 CREATE TABLE sourceTable ( `time` STRING ) WITH( ... ); sink如下 CREATE TABLE sinktable ( `time1` STRING, `time` TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式 insert into sinktable select `time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable 报错说是format错误 Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T at java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663) at java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572) at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)
Re:Re: 使用flinksql时 jdbc connector参数不起作用
好的, 明白 在 2020-09-17 20:29:09,"Jark Wu" 写道: >> sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库 > >这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280 > >Best, >Jark > >On Thu, 17 Sep 2020 at 18:15, chenxuying wrote: > >> 环境是flink1.11.2+idea >> sql: >> CREATE TABLE sourceTable ( >> platform STRING >> ,game_id bigint >> ) WITH ( >> ... >> ); >> CREATE TABLE sinktable ( >> platform STRING >> ,game_id bigint >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = '', >> 'table-name' = '', >> 'driver' = 'com.mysql.jdbc.Driver', >> 'username' = '', >> 'password' = '', >> 'sink.buffer-flush.max-rows' = '2', >> 'sink.buffer-flush.interval' = '30s' >> ); >> insert into sinktable select platform,game_id from sourceTable; >> >> >> 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval >> 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行 >> 如果设置如下 >>sink.buffer-flush.max-rows = '0' >>'sink.buffer-flush.interval' = '60s' >> 导致每接收一条数据就插入数据库 >> 如果设置如下 >>sink.buffer-flush.max-rows = '10' >>'sink.buffer-flush.interval' = '0' >> 导致无法插入数据库 >> >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options >> >>
使用flinksql时 jdbc connector参数不起作用
环境是flink1.11.2+idea sql: CREATE TABLE sourceTable ( platform STRING ,game_id bigint ) WITH ( ... ); CREATE TABLE sinktable ( platform STRING ,game_id bigint ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = 'com.mysql.jdbc.Driver', 'username' = '', 'password' = '', 'sink.buffer-flush.max-rows' = '2', 'sink.buffer-flush.interval' = '30s' ); insert into sinktable select platform,game_id from sourceTable; 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行 如果设置如下 sink.buffer-flush.max-rows = '0' 'sink.buffer-flush.interval' = '60s' 导致每接收一条数据就插入数据库 如果设置如下 sink.buffer-flush.max-rows = '10' 'sink.buffer-flush.interval' = '0' 导致无法插入数据库 [1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
Re:Re: 请问在flinksql中如何使用聚合函数 LISTAGG
好的 , 原来是bug , 感谢回答 在 2020-08-12 21:32:40,"Benchao Li" 写道: >看起来是一个已知bug[1],已经修复,但是还没有发布。 > >[1] https://issues.apache.org/jira/browse/FLINK-18862 > >chenxuying 于2020年8月12日周三 下午9:25写道: > >> 版本: >> flinksql 1.11.0 >> 需求: >> 需要实现多行聚合成一行功能 >> 代码如下: >> environmentSettings = >> EnvironmentSettings.new_instance().in_streaming_mode().build() >> t_env = StreamTableEnvironment.create(environment_settings = >> environmentSettings) >> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", >> 'true') >> >> >> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]}) >> a_table = t_env.from_pandas(a_df, >> DataTypes.ROW([DataTypes.FIELD("id", >> DataTypes.STRING()), >> DataTypes.FIELD("uuid", >> DataTypes.STRING())])) >> t_env.create_temporary_view("table_a",a_table) >> >> >> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]}) >> table_b = t_env.from_pandas(b_df , >> DataTypes.ROW([DataTypes.FIELD("val", >> DataTypes.STRING()), >> DataTypes.FIELD("uuid", >> DataTypes.STRING())])) >> t_env.create_temporary_view("table_b",table_b) >> >> >> t_env.sql_update(""" >> CREATE TABLE mySink ( >> >> b varchar , >> c varchar >> ) WITH ( >> 'connector' = 'print' >> ) >> """) >> >> >> t_env.sql_update(""" >> insert into mySink >> select t1.id ,LISTAGG(t2.val , ',') >> from table_a t1 left join table_b t2 on t1.uuid = t2.uuid >> group by t1.id >> """) >> t_env.execute("tutorial_job") >> >> >> 报错: >> Caused by: java.lang.ClassCastException: >> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to >> org.apache.flink.table.data.StringData at >> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) >> at >> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) >> at org.apache.flink.table.data.RowData.get(RowData.java:273) at >> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) >> at >> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) >> at >> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) >> at >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) >> at >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) >> at >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown >> Source) at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at >> java.lang.Thread.run(Thread.java:745) >> >> >> >> > >-- > >Best, >Benchao Li
请问在flinksql中如何使用聚合函数 LISTAGG
版本: flinksql 1.11.0 需求: 需要实现多行聚合成一行功能 代码如下: environmentSettings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(environment_settings = environmentSettings) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]}) a_table = t_env.from_pandas(a_df, DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()), DataTypes.FIELD("uuid", DataTypes.STRING())])) t_env.create_temporary_view("table_a",a_table) b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]}) table_b = t_env.from_pandas(b_df , DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()), DataTypes.FIELD("uuid", DataTypes.STRING())])) t_env.create_temporary_view("table_b",table_b) t_env.sql_update(""" CREATE TABLE mySink ( b varchar , c varchar ) WITH ( 'connector' = 'print' ) """) t_env.sql_update(""" insert into mySink select t1.id ,LISTAGG(t2.val , ',') from table_a t1 left join table_b t2 on t1.uuid = t2.uuid group by t1.id """) t_env.execute("tutorial_job") 报错: Caused by: java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) at org.apache.flink.table.data.RowData.get(RowData.java:273) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745)
Re:Re: 有界数据中batch和stream的区别
你好,请问下我修改后的语句是 insert into print_sink select game_id,count(id) from mysql_source group by game_id 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下 2> +I(12,1) 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4) 然后如果我使用的是batchMode,他就报错了 org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor down. at org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440) at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425) ... Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ... at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) ... 21 more Suppressed: org.apache.flink.util.FlinkException: Could not properly shut down the TaskManager services. at org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236) at org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462) at... ... 21 more Caused by: org.apache.flink.util.FlinkException: Could not close resource. at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42) at org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204) ... 37 more Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94) at org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64) ... at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) ... 21 more [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper] 不知道您是否知道原因 在 2020-08-04 12:11:32,"godfrey he" 写道: >逻辑上批产生的结果是Table,流产生的结果是Changelog。 >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。 >最简单的方式可以将query改为带group by的,再看结果的差异。 >更多关于Table和Changelog的概念可以参考 [1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html > >chenxuying 于2020年8月4日周二 上午11:44写道: > >> hi : >> flink table sql 1.11.0 >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode >> >> >> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() >> //.inStreamingMode() >> .inBatchMode() >> .build(); >> >> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , >> 不知道大佬们有没有例子可以比较容易理解 >> 我的代码 >> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() >> //.inStreamingMode() >> .inBatchMode() >> .build(); >> TableEnvironment tableEnvironment = >> TableEnvironment.create(environmentSettings); >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + >> " id bigint, " + >> " game_id varchar, " + >> " PRIMARY KEY (id) NOT ENFORCED " + >> " ) " + >> " with ( " + >> "'connector' = 'jdbc', " + >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + >> " 'username' = 'root' , " + >> " 'password' = 'root', " + >> " 'table-name' = 'mysqlsink' , " + >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + >> " 'sink.buffer-flush.interval' = '2s', " + >> " 'sink.buffer-flush.max-rows' = '300' " + >> " )"); >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " + >> " id bigint, " + >> " game_id varchar, " + >> " PRIMARY KEY (id) NOT ENFORCED " + >> " ) " + >> " with ( " + >> "'connector' = 'print' " + >> " )"); >> tableEnvironment.executeSql("insert into print_sink select id,game_id from >> mysql_source");
有界数据中batch和stream的区别
hi : flink table sql 1.11.0 在EnvironmentSettings中可以设置BatchMode或StreamingMode EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() //.inStreamingMode() .inBatchMode() .build(); 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , 不知道大佬们有没有例子可以比较容易理解 我的代码 EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() //.inStreamingMode() .inBatchMode() .build(); TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings); tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + "'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'username' = 'root' , " + " 'password' = 'root', " + " 'table-name' = 'mysqlsink' , " + " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'sink.buffer-flush.interval' = '2s', " + " 'sink.buffer-flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("CREATE TABLE print_sink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + "'connector' = 'print' " + " )"); tableEnvironment.executeSql("insert into print_sink select id,game_id from mysql_source");
Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface
谢谢, 明白了 在 2020-08-03 10:42:53,"Leonard Xu" 写道: >如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG >都支持 upsert, 底层对应的sql语句是 > >Database Upsert Grammar >MySQL INSERT .. ON DUPLICATE KEY UPDATE .. >PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET .. > >MySQL connector 不支持 replace into, 用的是 on duplicate key update. > >祝好 >Leonard >[1] >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes > ><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes> > > >> 在 2020年8月3日,10:33,chenxuying 写道: >> >> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT >> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table >> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update >> , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-08-02 09:48:04,"Leonard Xu" 写道: >>> Hi, >>> >>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL >>> 支持的INSERT语法,但是不是所有的 connector 都支持 INSERT OVERWRITE, 目前支持的只有 Filesystem >>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持 >>> INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1], >>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT >>> OVERWRITE到DB的场景吗? >>> >>> Best >>> Leonard >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling> >>> >>>> 在 2020年8月1日,19:20,chenxuying 写道: >>>> >>>> Hello >>>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert >>>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误 >>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement >>>> SupportsOverwrite interface. >>>> 是得自定义connector吗,实现DynamicTableSink? >>>> >>>> >>>> 祝好 >>>> chenxuying >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax >>> >
Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface
你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持 在 2020-08-02 09:48:04,"Leonard Xu" 写道: >Hi, > >这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL >支持的INSERT语法,但是不是所有的 connector 都支持 INSERT OVERWRITE, 目前支持的只有 Filesystem >connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持 INSERT >OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1], >就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT >OVERWRITE到DB的场景吗? > >Best >Leonard >[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling > ><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling> > >> 在 2020年8月1日,19:20,chenxuying 写道: >> >> Hello >> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert >> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误 >> Exception in thread "main" org.apache.flink.table.api.ValidationException: >> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement >> SupportsOverwrite interface. >> 是得自定义connector吗,实现DynamicTableSink? >> >> >> 祝好 >> chenxuying >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax >
flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface
Hello 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误 Exception in thread "main" org.apache.flink.table.api.ValidationException: INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface. 是得自定义connector吗,实现DynamicTableSink? 祝好 chenxuying [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
hi ok,谢谢,懂了哈哈 在 2020-07-31 21:27:02,"Leonard Xu" 写道: >Hello > >> 在 2020年7月31日,21:13,chenxuying 写道: >> >> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 > >简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的, >是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], >如果已经在用1.11了,1.10的文档可以不用看的。 > >在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY >KEY,不会再有类似问题.1.11的文档参考[2]。 > >祝好 >Leonard > >[1] >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector > ><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector> >[2] >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table > ><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>
Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
谢谢回答 使用新属性可以 成功修改记录 , 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 在 2020-07-31 16:46:41,"Leonard Xu" 写道: >Hi, chenxuying > >看你还是用的还是 " 'connector.type' = 'jdbc', …. " >,这是老的option,使用老的option参数还是需要根据query推导主键, >需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式. > >Best >Leonard >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options > ><https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options> > >> 在 2020年7月31日,16:12,chenxuying 写道: >> >> hi >> 我使用的flink 1.11.0版本 >> 代码如下 >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); >> tableEnvironment.executeSql(" " + >> " CREATE TABLE mySource ( " + >> " a bigint, " + >> " b bigint " + >> " ) WITH ( " + >> " 'connector.type' = 'kafka', " + >> " 'connector.version' = 'universal', " + >> " 'connector.topic' = 'mytesttopic', " + >> " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + >> " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + >> " 'connector.properties.group.id' = 'flink-test-cxy', " + >> " 'connector.startup-mode' = 'latest-offset', " + >> " 'format.type' = 'json' " + >> " ) "); >> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + >> " id bigint, " + >> " game_id varchar, " + >> " PRIMARY KEY (id) NOT ENFORCED " + >> " ) " + >> " with ( " + >> " 'connector.type' = 'jdbc', " + >> " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' >> , " + >> " 'connector.username' = 'root' , " + >> " 'connector.password' = 'root', " + >> " 'connector.table' = 'mysqlsink' , " + >> " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + >> " 'connector.write.flush.interval' = '2s', " + >> " 'connector.write.flush.max-rows' = '300' " + >> " )"); >> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values >> (select a,cast(b as varchar) b from mySource)"); >> >> >> 问题一 : 上面的insert语句会出现如下错误 >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot >> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(> A, VARCHAR(2147483647) B)>)'. Supported form(s): >> '$SCALAR_QUERY()' >> >> >> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select >> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 >> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate >> entry '1' for key 'PRIMARY' >> >> >> >
flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
hi 我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + " ) WITH ( " + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " + " 'connector.topic' = 'mytesttopic', " + " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + " 'connector.properties.group.id' = 'flink-test-cxy', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json' " + " ) "); tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'connector.username' = 'root' , " + " 'connector.password' = 'root', " + " 'connector.table' = 'mysqlsink' , " + " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'connector.write.flush.interval' = '2s', " + " 'connector.write.flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)"); 问题一 : 上面的insert语句会出现如下错误 Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY()'. Supported form(s): '$SCALAR_QUERY()' 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
Re:Re: 官方pyflink 例子的执行问题
你好 明白了,感谢 , 我文档没看清楚哈 在 2020-07-21 11:44:23,"Xingbo Huang" 写道: >Hi, >你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", >True)就行,如果你用了的话,就需要配置off-heap >memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", >'80m')。你可以参考文档上的例子,以及对应的note说明[1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions > >Best, >Xingbo > > >chenxuying 于2020年7月21日周二 上午11:36写道: > >> 官方例子: >> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html >> 按照例子写了程序,也安装了pyflink >> | >> python -m pip install apache-flink >> | >> 代码: >> | >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, DataTypes >> from pyflink.table.descriptors import Schema, OldCsv, FileSystem >> from pyflink.table.udf import udf >> >> >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> t_env = StreamTableEnvironment.create(env) >> >> >> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], >> DataTypes.BIGINT()) >> >> >> t_env.register_function("add", add) >> >> >> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) >> \ >> .with_format(OldCsv() >> .field('a', DataTypes.BIGINT()) >> .field('b', DataTypes.BIGINT())) \ >> .with_schema(Schema() >> .field('a', DataTypes.BIGINT()) >> .field('b', DataTypes.BIGINT())) \ >> .create_temporary_table('mySource') >> >> >> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt')) >> \ >> .with_format(OldCsv() >> .field('sum', DataTypes.BIGINT())) \ >> .with_schema(Schema() >> .field('sum', DataTypes.BIGINT())) \ >> .create_temporary_table('mySink') >> >> >> t_env.from_path('mySource')\ >> .select("add(a, b)") \ >> .insert_into('mySink') >> >> >> t_env.execute("tutorial_job") >> | >> >> 执行: >> >> | >> python test_pyflink.py >> | >> >> 报错: >> >> >> | >> Traceback (most recent call last): >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", >> line 147, in deco >> return f(*a, **kw) >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", >> line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. >> : org.apache.flink.table.api.TableException: The configured Task Off-Heap >> Memory 0 bytes is less than the least required Python worker Memory 79 mb. >> The Task Off-Heap Memory can be configured using the configuration key >> 'taskmanager.memory.task.off-heap.size'. >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158) >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119) >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106) >> at >> org.apache.f
官方pyflink 例子的执行问题
官方例子: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html 按照例子写了程序,也安装了pyflink | python -m pip install apache-flink | 代码: | from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()) t_env.register_function("add", add) t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) \ .with_format(OldCsv() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT())) \ .with_schema(Schema() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt')) \ .with_format(OldCsv() .field('sum', DataTypes.BIGINT())) \ .with_schema(Schema() .field('sum', DataTypes.BIGINT())) \ .create_temporary_table('mySink') t_env.from_path('mySource')\ .select("add(a, b)") \ .insert_into('mySink') t_env.execute("tutorial_job") | 执行: | python test_pyflink.py | 报错: | Traceback (most recent call last): File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. : org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'. at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158) at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119) at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at
flink REST API是否支持-C参数
目前使用的是flink 1.10.0 背景: REST API有一个提交job的接口 接口 /jars/:jarid/run 参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath 如果使用命令行方式提交job flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c cn.xuying.flink.table.sql.ParserSqlJob /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar 可以看到命令行方式支持-C提供另外的jar包,flink会加载到classpath 问题: 发现目前的restapi并没有提供想命令行一样的-C参数的功能 , 所以想知道这个功能将来是否会增加
Re:Re: Re: FlinkSQL 是否支持类似临时中间表的概念
你好,请问下,my_parse是个udf吧 然后有没有什么操作可以使用udtf解析出多个字段 , 这些字段直接就是source表的字段 , 然后选出时间字段定义 watermark , 类似如下 CREATE TABLE sourceTable( request_uri STRING, (column_1,column_2,heart_time) as udtf_parse(request_uri) )with(..); 哈哈,不知道有没有这样的语法 在 2020-06-24 12:24:46,"Jark Wu" 写道: >你可以在 DDL 中直接用计算列去从 request_uri 里获得 heart_time 哈,然后在这个计算列上定义 watermark 即可。 >例如: > >CREATE TABLE sourceTable ( > request_uri STRING, > heart_time AS my_parse(request_uri), > WATERMARK FOR heart_time AS heart_time - INTERVAL '1' SECOND >) WITH ( ... ); > >虽然这会导致重复解析两遍。 > > >Best, >Jark > >On Wed, 24 Jun 2020 at 12:09, Weixubin <18925434...@163.com> wrote: > >> >> >> >> 感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。 >> >> >> 我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。 >> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。 >> 第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。 >> >> >> //如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下: >> select >> ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') , >> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND >> from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….· >> >> >> //如果应用到source,则一开始并不知道heart_time 的值 >> CREATE TABLE sourceTable ( >> request_uri STRING >> ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') , >> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND >> ) WITH ( ... ); >> >> >> 只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明 >> Thanks >> Bin >> >> 在 2020-06-23 15:28:50,"Leonard Xu" 写道: >> >Hi >> >我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select >> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` >> 的这段sql是可以复用的,就和 VIEW的作用类似。 >> > >> >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , >> LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, >> 插入最终的结果表就能满足需求了吧。 >> > >> >祝好, >> >Leonard Xu >> > >> > >> >> 在 2020年6月23日,15:21,Weixubin <18925434...@163.com> 写道: >> >> >> >> >> >> >> >> >> >> Hi, >> >> 关于这句 “把 ` select * from sourceTable , LATERAL >> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 >> group后再写入最终结果表就可以了” >> >> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? >> 可否简单举个例子。 >> >> Thanks, >> >> Bin >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-23 11:57:28,"Leonard Xu" 写道: >> >>> Hi, >> >>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master >> 分支上的版本号为1.12-SNAPSHOT >> >>> ,等1.11版本发布了就可以看到对应的文档。 >> >>> >> >>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 >> 版本也可以一个作业搞定。 把 ` select * from sourceTable , LATERAL >> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 >> group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。 >> >>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。 >> >>> >> >>> >> >>> 祝好, >> >>> Leonard Xu >> > >>
Re:Re: Re: flink启动任务的方式
您说的jarFiles是以什么样的方式提交任务 然后我试了一下plugin,好像并不可以,重启flink cluster也不行 , 也不知是不是我的方式不对 我的目录结构是 xxx/flink/plugins/ folder1/ udf.jar 另外说一下,如果我把udf.jar放到 /flink/lib下,重启是可以的,不过这不是我想要的方式,不知道您是否理解,因为我想要的我随时可以写个udf.jar,随时可以用,不要重启flink cluster 在 2020-04-21 17:46:00,"Arnold Zai" 写道: >jarFiles参数不是个参数列表么,多传几个。 > >或把依赖提前部署到${FLINK_HOME}/plugins里 > >chenxuying 于2020年4月21日周二 下午3:36写道: > >> 这个是可以 , 不过我们的需求不允许打FatJar >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-04-21 15:27:48,"Arnold Zai" 写道: >> >打个FatJar >> > >> >chenxuying 于2020年4月21日周二 下午2:47写道: >> > >> >> 请问下目前flink的启动方式有哪些 >> >> 1 通过命令行来执行 >> >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c >> >> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar >> >> 2通过自带的webui页面上传jar , submit jar >> >> 3 通过代码 createRemoteEnvironment >> >> >> >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api >> >> 无法实现命令行那样提供其他的jar包 >> >> >> >> >> >> >> >> >>
Re:Re: flink启动任务的方式
这个是可以 , 不过我们的需求不允许打FatJar 在 2020-04-21 15:27:48,"Arnold Zai" 写道: >打个FatJar > >chenxuying 于2020年4月21日周二 下午2:47写道: > >> 请问下目前flink的启动方式有哪些 >> 1 通过命令行来执行 >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c >> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar >> 2通过自带的webui页面上传jar , submit jar >> 3 通过代码 createRemoteEnvironment >> >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api >> 无法实现命令行那样提供其他的jar包 >> >> >> >>
flink启动任务的方式
请问下目前flink的启动方式有哪些 1 通过命令行来执行 flink run -C file:///usr/local/soft/flink/function-0.1.jar -c cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar 2通过自带的webui页面上传jar , submit jar 3 通过代码 createRemoteEnvironment 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api 无法实现命令行那样提供其他的jar包