Re: 提交flink作业抛 java.lang.LinkageError
Hi, 能检查下作业jar里 kafka client的版本和平台上的是否一致吗? casel.chen 于2021年11月5日周五 下午11:25写道: > 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink > sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka > connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢! > > > 2021-11-05 16:38:58 - [submit-session-executor-6] ERROR > c.h.s.launcher.AbstractJobExecutor - -start job failed- > > > org.apache.flink.client.program.ProgramInvocationException: The program > caused an error: > > > > > > > Classpath: > [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar] > > > > > > > System.out: (none) > > > > > > > System.err: (none) > > > at > org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) > > > at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) > > > at > com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205) > > > at > com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31) > > > at > com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51) > > > at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15) > > > at > com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443) > > > at > com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662) > > > at > com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623) > > > at > com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke() > > > at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) > > > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) > > > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) > > > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) > > > at > org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156) > > > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) > > > at > org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) > > > at > org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) > > > at > com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment() > > > at > com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63) > > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.lang.LinkageError: loader constraint violation: loader > (instance of org/apache/flink/util/ChildFirstClassLoader) previously > initiated loading for a different type with name > "org/apache/kafka/clients/consumer/ConsumerRecord" > > > at java.lang.ClassLoader.defineClass1(Native Method) > > > at java.lang.ClassLoader.defineClass(ClassLoader.java:756) > > > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > > > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:369) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:363) > > > at java.security.AccessController.doPrivileged(Native Method)
Re: flink-yarn的pre-job模式
你好, 上传的图片无法加载。 这种情况是 yarn 无法提供拉起taskmanager,检查下yarn资源是否充足? 王健 <13166339...@163.com> 于2021年10月26日周二 下午7:50写道: > 您好: > 我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。 > > 1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c > com.worktrans.flink.wj.ods.FlinkCDC01 /usr/local/flink-1.13.2/flink_x.jar > 提交正常,如图: > > 2.yarn 截图 > > > 3.flink截图: > 现象:taskslot和taskmanager数量都为0,一直在申请 > > > 4.最后结果:报错如下 > 2021-10-25 16:17:49 > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Slot request bulk is not fulfillable! Could not allocate the required slot > within slot request timeout > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222) > at > org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164) > at > org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86) > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:517) > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Slot request bulk is not fulfillable! Could not allocate the required slot > within slot request timeout > at > org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) > ... 26 more > Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: > 30 ms > ... 27 more > > > > > > > > > > > > > > > > > > >
Re: Pyflink 提交到本地集群报错
Huilin 你好, 你用的是哪个版本的Flink呢? Huilin_WU <592900...@qq.com> 于2021年3月10日周三 上午9:39写道: > 我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py > xx.py就会报上面的错误说没有pyflink的组件。 > (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m > localhost:8081 -py demo_predict.py > Traceback (most recent call last): > File "demo_predict.py", line 51, in > from pyflink.common.serialization import SimpleStringEncoder > ModuleNotFoundError: No module named 'pyflink.common.serialization' > > 我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: 问题求助(Pyflink)
抱歉,漏了文档链接 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#enabling-kerberos-authentication Shuiqiang Chen 于2021年1月30日周六 下午4:32写道: > Hi, > 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 > security.kerberos.login.principal这两个属性了吗? > 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗? > > 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道: > >> 老师,您好, >> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下: >> Caused by: java.lang.IllegalArgumentException: Could not find a >> 'KafkaClient' entry in the JAAS configuration. System property >> 'java.security.auth.login.config' is not set at >> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) >> 2)这是Flink配置里的jaas.conf >> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf >> 3.1)这是spark配置里的jaas.conf >> >> 3.2)这是spark配置里的jaas-zk.conf >> >> >> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。 >> >> >> -- 原始邮件 -- >> *发件人:* "user-zh" ; >> *发送时间:* 2021年1月30日(星期六) 下午3:49 >> *收件人:* "user-zh"; >> *主题:* Re: 问题求助(Pyflink) >> >> 你好, >> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka >> partition相关meta信息和认证相关是否成功的信息。 >> >> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道: >> >> > 老师,你好,消费是没有任何问题,可以正常消费。 >> > >> > >> > >> > >> > -- 原始邮件 -- >> > *发件人:* "user-zh" ; >> > *发送时间:* 2021年1月30日(星期六) 下午3:08 >> > *收件人:* "user-zh"; >> > *主题:* Re:问题求助(Pyflink) >> > >> > 先看下kafka能否通过命令行消费数据. >> > >> > 命令行检查确保能消费,再使用Flink. >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道: >> > >> > >> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka >> > 存在问题,希望老师能够给解决疑惑。 >> > 1)Kafka生产数据: >> > >> > 2)pyflink 程序 >> > >> > >> > #!/usr/bin/python3.7 >> > # -*- coding: UTF-8 -*- >> > from pyflink.datastream import StreamExecutionEnvironment, >> > CheckpointingMode >> > from pyflink.table import StreamTableEnvironment, TableConfig, >> DataTypes, >> > CsvTableSink, WriteMode, SqlDialect >> > from pyflink.table.descriptors import >> FileSystem,OldCsv,Schema,Kafka,Json >> > s_env = StreamExecutionEnvironment.get_execution_environment() >> > s_env.set_parallelism(1) >> > s_env.enable_checkpointing(3000) >> > st_env = StreamTableEnvironment.create(s_env, TableConfig()) >> > st_env.use_catalog("default_catalog") >> > st_env.use_database("default_database") >> > >> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", >> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002 >> ").property("security.protocol", >> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", >> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com >> ').property("bootstrap.servers", >> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007 >> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", >> > DataTypes.BIGINT()),DataTypes.FIELD("name", >> > DataTypes.STRING())]))).with_schema(Schema().field("id", >> > DataTypes.BIGINT()).field("name", >> > DataTypes.STRING())).register_table_source("sourceKafka") >> > fieldNames = ["id", "name"] >> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] >> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", >> ",", >> > 1, WriteMode.OVERWRITE) >> > st_env.register_table_sink("csvTableSink", csvSink) >> > resultQuery = st_env.sql_query("select id,name from sourceKafka") >> > resultQuery = resultQuery.insert_into("csvTableSink") >> > st_env.execute("pyflink-kafka-v2") >> > 3)pyflink-shell.sh local >> > >> > 4)运行结果 >> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: >> > >> > >> > >> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 >> > >> > >> > >> > >> >
Re: 问题求助(Pyflink)
Hi, 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 security.kerberos.login.principal这两个属性了吗? 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗? 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道: > 老师,您好, > 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下: > Caused by: java.lang.IllegalArgumentException: Could not find a > 'KafkaClient' entry in the JAAS configuration. System property > 'java.security.auth.login.config' is not set at > org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) > 2)这是Flink配置里的jaas.conf > 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf > 3.1)这是spark配置里的jaas.conf > > 3.2)这是spark配置里的jaas-zk.conf > > > 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。 > > > -- 原始邮件 -- > *发件人:* "user-zh" ; > *发送时间:* 2021年1月30日(星期六) 下午3:49 > *收件人:* "user-zh"; > *主题:* Re: 问题求助(Pyflink) > > 你好, > 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka > partition相关meta信息和认证相关是否成功的信息。 > > 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道: > > > 老师,你好,消费是没有任何问题,可以正常消费。 > > > > > > > > > > -- 原始邮件 -- > > *发件人:* "user-zh" ; > > *发送时间:* 2021年1月30日(星期六) 下午3:08 > > *收件人:* "user-zh"; > > *主题:* Re:问题求助(Pyflink) > > > > 先看下kafka能否通过命令行消费数据. > > > > 命令行检查确保能消费,再使用Flink. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道: > > > > > 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka > > 存在问题,希望老师能够给解决疑惑。 > > 1)Kafka生产数据: > > > > 2)pyflink 程序 > > > > > > #!/usr/bin/python3.7 > > # -*- coding: UTF-8 -*- > > from pyflink.datastream import StreamExecutionEnvironment, > > CheckpointingMode > > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, > > CsvTableSink, WriteMode, SqlDialect > > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json > > s_env = StreamExecutionEnvironment.get_execution_environment() > > s_env.set_parallelism(1) > > s_env.enable_checkpointing(3000) > > st_env = StreamTableEnvironment.create(s_env, TableConfig()) > > st_env.use_catalog("default_catalog") > > st_env.use_database("default_database") > > > st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", > > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002 > ").property("security.protocol", > > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", > > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com > ').property("bootstrap.servers", > > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007 > ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", > > DataTypes.BIGINT()),DataTypes.FIELD("name", > > DataTypes.STRING())]))).with_schema(Schema().field("id", > > DataTypes.BIGINT()).field("name", > > DataTypes.STRING())).register_table_source("sourceKafka") > > fieldNames = ["id", "name"] > > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] > > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", > ",", > > 1, WriteMode.OVERWRITE) > > st_env.register_table_sink("csvTableSink", csvSink) > > resultQuery = st_env.sql_query("select id,name from sourceKafka") > > resultQuery = resultQuery.insert_into("csvTableSink") > > st_env.execute("pyflink-kafka-v2") > > 3)pyflink-shell.sh local > > > > 4)运行结果 > > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: > > > > > > > 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 > > > > > > > > >
Re: 问题求助(Pyflink)
你好, 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka partition相关meta信息和认证相关是否成功的信息。 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道: > 老师,你好,消费是没有任何问题,可以正常消费。 > > > > > -- 原始邮件 -- > *发件人:* "user-zh" ; > *发送时间:* 2021年1月30日(星期六) 下午3:08 > *收件人:* "user-zh"; > *主题:* Re:问题求助(Pyflink) > > 先看下kafka能否通过命令行消费数据. > > 命令行检查确保能消费,再使用Flink. > > > > > > > > > > > > > > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道: > > 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka > 存在问题,希望老师能够给解决疑惑。 > 1)Kafka生产数据: > > 2)pyflink 程序 > > > #!/usr/bin/python3.7 > # -*- coding: UTF-8 -*- > from pyflink.datastream import StreamExecutionEnvironment, > CheckpointingMode > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, > CsvTableSink, WriteMode, SqlDialect > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > s_env.enable_checkpointing(3000) > st_env = StreamTableEnvironment.create(s_env, TableConfig()) > st_env.use_catalog("default_catalog") > st_env.use_database("default_database") > st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", > 'kafka').property("kerberos.domain.name", > 'hadoop.hadoop.com').property("bootstrap.servers", > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", > DataTypes.BIGINT()),DataTypes.FIELD("name", > DataTypes.STRING())]))).with_schema(Schema().field("id", > DataTypes.BIGINT()).field("name", > DataTypes.STRING())).register_table_source("sourceKafka") > fieldNames = ["id", "name"] > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", > 1, WriteMode.OVERWRITE) > st_env.register_table_sink("csvTableSink", csvSink) > resultQuery = st_env.sql_query("select id,name from sourceKafka") > resultQuery = resultQuery.insert_into("csvTableSink") > st_env.execute("pyflink-kafka-v2") > 3)pyflink-shell.sh local > > 4)运行结果 > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: > > > 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 > > > >
Re: FlinkKafkaConsumer问题
Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。 > 在 2020年9月4日,上午10:25,op <520075...@qq.com> 写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2020年9月3日(星期四) 晚上6:09 > 收件人:"user-zh" > 主题:Re: FlinkKafkaConsumer问题 > > > > Hi op, > > 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 > partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 > Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 > > 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit > offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > > 在 2020年9月3日,下午3:03,op <520075...@qq.com 写道: > > nbsp; nbsp; hi,nbsp; nbsp; > 我对FlinkKafkaConsumer的实现有点迷惑,nbsp; nbsp; 这有两个相同代码的程序: > //--- > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > Env.setRestartStrategy(RestartStrategies.noRestart()) > val consumerProps = new Properties() > consumerProps.put("bootstrap.servers", brokers) > consumerProps.put("group.id", "test1234") > > val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > Env.addSource(consumer).print() > > Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka > 的consumer > group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 > 谢谢
Re: FlinkKafkaConsumer问题
Hi op, 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > 在 2020年9月3日,下午3:03,op <520075...@qq.com> 写道: > > hi, 我对FlinkKafkaConsumer的实现有点迷惑, > 这有两个相同代码的程序: > //--- > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > Env.setRestartStrategy(RestartStrategies.noRestart()) > val consumerProps = new Properties() > consumerProps.put("bootstrap.servers", brokers) > consumerProps.put("group.id", "test1234") > > val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > Env.addSource(consumer).print() > Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka > 的consumer > group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 > 谢谢
Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation
Hi jincheng, Thanks for the discussion. +1 for the FLIP. A well-organized documentation will greatly improve the efficiency and experience for developers. Best, Shuiqiang Hequn Cheng 于2020年8月1日周六 上午8:42写道: > Hi Jincheng, > > Thanks a lot for raising the discussion. +1 for the FLIP. > > I think this will bring big benefits for the PyFlink users. Currently, the > Python TableAPI document is hidden deeply under the TableAPI tab which > makes it quite unreadable. Also, the PyFlink documentation is mixed with > Java/Scala documentation. It is hard for users to have an overview of all > the PyFlink documents. As more and more functionalities are added into > PyFlink, I think it's time for us to refactor the document. > > Best, > Hequn > > > On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira > wrote: > >> Hi, Jincheng! >> >> Thanks for creating this detailed FLIP, it will make a big difference in >> the experience of Python developers using Flink. I'm interested in >> contributing to this work, so I'll reach out to you offline! >> >> Also, thanks for sharing some information on the adoption of PyFlink, it's >> great to see that there are already production users. >> >> Marta >> >> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang wrote: >> >> > Hi Jincheng, >> > >> > Thanks a lot for bringing up this discussion and the proposal. >> > >> > Big +1 for improving the structure of PyFlink doc. >> > >> > It will be very friendly to give PyFlink users a unified entrance to >> learn >> > PyFlink documents. >> > >> > Best, >> > Xingbo >> > >> > Dian Fu 于2020年7月31日周五 上午11:00写道: >> > >> >> Hi Jincheng, >> >> >> >> Thanks a lot for bringing up this discussion and the proposal. +1 to >> >> improve the Python API doc. >> >> >> >> I have received many feedbacks from PyFlink beginners about >> >> the PyFlink doc, e.g. the materials are too few, the Python doc is >> mixed >> >> with the Java doc and it's not easy to find the docs he wants to know. >> >> >> >> I think it would greatly improve the user experience if we can have one >> >> place which includes most knowledges PyFlink users should know. >> >> >> >> Regards, >> >> Dian >> >> >> >> 在 2020年7月31日,上午10:14,jincheng sun 写道: >> >> >> >> Hi folks, >> >> >> >> Since the release of Flink 1.11, users of PyFlink have continued to >> grow. >> >> As far as I know there are many companies have used PyFlink for data >> >> analysis, operation and maintenance monitoring business has been put >> into >> >> production(Such as 聚美优品[1](Jumei), 浙江墨芷[2] (Mozhi) etc.). According >> to >> >> the feedback we received, current documentation is not very friendly to >> >> PyFlink users. There are two shortcomings: >> >> >> >> - Python related content is mixed in the Java/Scala documentation, >> which >> >> makes it difficult for users who only focus on PyFlink to read. >> >> - There is already a "Python Table API" section in the Table API >> document >> >> to store PyFlink documents, but the number of articles is small and the >> >> content is fragmented. It is difficult for beginners to learn from it. >> >> >> >> In addition, FLIP-130 introduced the Python DataStream API. Many >> >> documents will be added for those new APIs. In order to increase the >> >> readability and maintainability of the PyFlink document, Wei Zhong and >> me >> >> have discussed offline and would like to rework it via this FLIP. >> >> >> >> We will rework the document around the following three objectives: >> >> >> >> - Add a separate section for Python API under the "Application >> >> Development" section. >> >> - Restructure current Python documentation to a brand new structure to >> >> ensure complete content and friendly to beginners. >> >> - Improve the documents shared by Python/Java/Scala to make it more >> >> friendly to Python users and without affecting Java/Scala users. >> >> >> >> More detail can be found in the FLIP-133: >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation >> >> >> >> Best, >> >> Jincheng >> >> >> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg >> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g >> >> >> >> >> >> >> >
Re: flink1.11启动问题
Hi, 可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源 Best, Shuiqiang 酷酷的浑蛋 于2020年7月21日周二 下午4:37写道: > > > 服了啊,这个flink1.11启动怎么净是问题啊 > > > 我1.7,1.8,1.9 都没有问题,到11就不行 > ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm > 1024 -ynm sql_test ./examples/batch/WordCount.jar --input > hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a > > > 报错: > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate the required slot within slot request timeout. Please > make sure that the cluster has enough resources. at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) > ... 45 more Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > ... 25 more > > > > 我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题), > >
Re: pyflink1.11.0window
看看异常信息, 是不是你的insert mode没配置对。 BTW, 你粘贴的文本带有很多"", 有点影响可读性。 Best, Shuiqiang 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月20日周一 下午4:23写道: > HI : > 我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下 > Traceback (most recent call last): > File > "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line > 147, in deco > return f(*a, **kw) > File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate. > : org.apache.flink.table.api.TableException: AppendStreamTableSink > requires that Table has only insert changes. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > > > > > During handling of the above exception, another exception occurred: > > > Traceback (most recent call last): > File "tou.py", line 99, infrom_kafka_to_kafka_demo() > File "tou.py", line 33, in from_kafka_to_kafka_demo > st_env.sql_update("insert into flink_result select > id,type,rowtime from final_result2") > File > "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", > line 547, in sql_update > self._j_tenv.sqlUpdate(stmt) > File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line > 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], > stack_trace) > pyflink.util.exceptions.TableException: 'AppendStreamTableSink requires > that Table has only insert changes.' > > > > > > 这种应该如何实现,需求大概是一个流表(需要分组汇总)关联一个维表。 > > > from pyflink.datastream import StreamExecutionEnvironment, > TimeCharacteristic > from pyflink.table import StreamTableEnvironment, DataTypes, > EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink > from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime > from pyflink.table.window import Tumble > > > > > def from_kafka_to_kafka_demo(): > > > # use blink table planner > env = StreamExecutionEnvironment.get_execution_environment() > > env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > env_settings = >
Re: pyflink1.11.0window
下面这个例子从kafka读取json格式的数据, 然后做窗口聚合后写入es, 可以参考下代码结构, 修改相应数据字段。 这份代码你本地应该是不能运行的 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings from pyflink.table.udf import udf @udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING()) def platform_code_to_name(code): return "mobile" if code == 0 else "pc" def log_processing(): env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env_settings = EnvironmentSettings.Builder().use_blink_planner().build() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings) source_ddl = """ CREATE TABLE payment_msg( createTime VARCHAR, rt as TO_TIMESTAMP(createTime), orderId BIGINT, payAmount DOUBLE, payPlatform INT, paySource INT, WATERMARK FOR rt as rt - INTERVAL '2' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'payment_msg_2', 'connector.properties.bootstrap.servers' = '0.0.0.0:9092', 'connector.properties.group.id' = 'test_3', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ) """ t_env.sql_update(source_ddl) es_sink_ddl = """ CREATE TABLE es_sink ( platform VARCHAR, pay_amount DOUBLE, rowtime TIMESTAMP(3) ) with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'platform_pay_amount_1', 'connector.document-type' = 'payment', 'update-mode' = 'upsert', 'connector.flush-on-checkpoint' = 'true', 'connector.key-delimiter' = '$', 'connector.key-null-literal' = 'n/a', 'connector.bulk-flush.max-size' = '42mb', 'connector.bulk-flush.max-actions' = '32', 'connector.bulk-flush.interval' = '1000', 'connector.bulk-flush.backoff.delay' = '1000', 'format.type' = 'json' ) """ t_env.sql_update(es_sink_ddl) t_env.register_function('platformcodetoname', platform_code_to_name) query = """ select platformcodetoname(payPlatform) as platform, sum(payAmount) as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as rowtime from payment_msg group by tumble(rt, interval '5' seconds), payPlatform """ count_result = t_env.sql_query(query) t_env.create_temporary_view('windowed_values', count_result) query2 = """ select platform, last_value(pay_amount), rowtime from windowed_values group by platform, rowtime """ final_result = t_env.sql_query(query2) final_result.execute_insert(table_path='es_sink') if __name__ == '__main__': log_processing() 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 下午4:40写道: > Shuiqiang,你好: > > hi,能否请求您贡献一下完整的代码的案例,我是初学者,官网的2-from_kafka_to_kafka.py这个没有窗口,我现在想要一个在此基础上有窗口的demo,尝试编了很久也未能解决。我在给这个demo加上窗口功能后总是有各种各样的问题,十分痛苦,如能帮助,感激不尽。 > > > 恳请所有看到此封邮件的大佬! > > > 谢谢! > > > > > --原始邮件-- > 发件人: > "user-zh" > < > acqua@gmail.com; > 发送时间:2020年7月15日(星期三) 中午11:25 > 收件人:"user-zh" > 主题:Re: pyflink1.11.0window > > > > 举个sql例子 > select platformcodetoname(payPlatform) as platform, sum(payAmount) as > pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as > rowtime > from payment_msg group by tumble(rt, interval '5' seconds), payPlatform > 这个query 对每5s的tumble窗口做统计。 > > 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午11:10写道: > > Shuiqiang,你好: > nbsp; > nbsp;我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window? > > > --nbsp;原始邮件nbsp;-- > 发件人: > > "user-zh" > > < > acqua@gmail.comgt;; > 发送时间:nbsp;2020年7月15日(星期三) 上午10:51 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: pyflink1.11.0window > > > > 琴师你好, > 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window > expects a size value literal. > 看起来是接下tumble window定义的代码不太正确吧 > > Best, > Shuiqiang > > 奇怪的不朽琴师 <1129656...@qq.comgt; 于2020年7月15日周三 上午10:27写道: > > gt; 你好: > gt; amp;nbsp; amp;nbsp; > gt; > > amp;nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。 > gt; Traceback (most recent call last): > gt; amp;nbsp; File "tou.py", line 71, in gt; amp;nbsp; amp;nbsp; from_kafka_to_kafka_demo() > gt; amp;nbsp; File "tou.py", line 21, in > from_kafka_to_kafka_demo > gt;
Re: pyflink1.11.0window
举个sql例子 select platformcodetoname(payPlatform) as platform, sum(payAmount) as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as rowtime from payment_msg group by tumble(rt, interval '5' seconds), payPlatform 这个query 对每5s的tumble窗口做统计。 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午11:10写道: > Shuiqiang,你好: > 我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window? > > > --原始邮件-- > 发件人: > "user-zh" > < > acqua@gmail.com; > 发送时间:2020年7月15日(星期三) 上午10:51 > 收件人:"user-zh" > 主题:Re: pyflink1.11.0window > > > > 琴师你好, > 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window > expects a size value literal. > 看起来是接下tumble window定义的代码不太正确吧 > > Best, > Shuiqiang > > 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午10:27写道: > > 你好: > nbsp; nbsp; > > nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。 > Traceback (most recent call last): > nbsp; File "tou.py", line 71, in nbsp; nbsp; from_kafka_to_kafka_demo() > nbsp; File "tou.py", line 21, in from_kafka_to_kafka_demo > nbsp; nbsp; .select(" id,nbsp; time1 , time1 ")\ > nbsp; File > "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line > 907, > in select > nbsp; nbsp; return Table(self._j_table.select(fields), > self._t_env) > nbsp; File > "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > nbsp; nbsp; answer, self.gateway_client, self.target_id, > self.name) > nbsp; File > "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line > 147, in deco > nbsp; nbsp; return f(*a, **kw) > nbsp; File > "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > nbsp; nbsp; format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > o26.select. > : org.apache.flink.table.api.ValidationException: A tumble window > expects > a size value literal. > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781) > nbsp; nbsp; nbsp; nbsp; at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > nbsp; nbsp; nbsp; nbsp; at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > nbsp; nbsp; nbsp; nbsp; at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > nbsp; nbsp; nbsp; nbsp; at > java.lang.reflect.Method.invoke(Method.java:498) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > nbsp; nbsp; nbsp; nbsp; at > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > nbsp; nbsp; nbsp; nbsp; at > java.lang.Thread.run(Thread.java:748) > > > > > > > > > > > > > > > > > > > > > > > > def register_rides_source(st_env): > nbsp; nbsp; source_ddl = \ > nbsp; nbsp; """ > nbsp; nbsp; create table source1( > nbsp; nbsp; nbsp;id int, > nbsp; nbsp; nbsp;time1 timestamp, > nbsp; nbsp; nbsp;type string, > nbsp; nbsp; nbsp;WATERMARK FOR time1 as time1 - > INTERVAL '2' SECOND > nbsp; nbsp; nbsp;) with ( > nbsp; nbsp; 'connector.type' = 'kafka', > nbsp; nbsp; 'update-mode' = 'append', > nbsp; nbsp; 'connector.topic' = 'tp1', > nbsp; nbsp; 'connector.properties.bootstrap.servers' = > 'localhost:9092', > nbsp; nbsp; 'connector.properties.zookeeper.connect' = > 'localhost:2181', > nbsp; nbsp; 'format.type' = 'json', > nbsp; nbsp; 'format.derive-schema' = 'true', > nbsp; nbsp; 'connector.version' =
Re: pyflink1.11.0window
琴师你好, 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window expects a size value literal. 看起来是接下tumble window定义的代码不太正确吧 Best, Shuiqiang 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午10:27写道: > 你好: > > 我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。 > Traceback (most recent call last): > File "tou.py", line 71, infrom_kafka_to_kafka_demo() > File "tou.py", line 21, in from_kafka_to_kafka_demo > .select(" id, time1 , time1 ")\ > File > "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line 907, > in select > return Table(self._j_table.select(fields), self._t_env) > File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line > 147, in deco > return f(*a, **kw) > File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o26.select. > : org.apache.flink.table.api.ValidationException: A tumble window expects > a size value literal. > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384) > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302) > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250) > at > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794) > at > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781) > at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > > > > > > > > > > > > > > > > > > > > > > > > def register_rides_source(st_env): > source_ddl = \ > """ > create table source1( > id int, > time1 timestamp, > type string, > WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND > ) with ( > 'connector.type' = 'kafka', > 'update-mode' = 'append', > 'connector.topic' = 'tp1', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'format.type' = 'json', > 'format.derive-schema' = 'true', > 'connector.version' = 'universal' > ) > """ > st_env.sql_update(source_ddl) > > > s_env = > StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > > > st_env = StreamTableEnvironment.create(s_env) > > > register_rides_source(st_env) > register_rides_sink(st_env) > > > st_env.from_path("source1")\ > > .window(Tumble.over("2.secends").on("time1").alias("w")) \ > .group_by("w") \ > .select(" id, time1 , time1 ")\ > .insert_into("sink1") > > st_env.execute("2-from_kafka_to_kafka") > > > 代码如上 > > > > > > > > > --原始邮件-- > 发件人: > "user-zh" > < > acqua@gmail.com; > 发送时间:2020年7月10日(星期五) 上午9:17 > 收件人:"user-zh" > 主题:Re: pyflink1.11.0window > > > > 琴师你好, > > 你的source ddl里有指定time1为 time attribute吗? > create table source1( > id int, > time1 timestamp, > type string, > WATERMARK FOR time1 as time1 - > INTERVAL '2' SECOND > ) with (...) > > 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月10日周五 上午8:43写道: > > --nbsp;原始邮件nbsp;-- > 发件人: > > "奇怪的不朽琴师" > > < > 1129656...@qq.comgt;; > 发送时间:nbsp;2020年7月9日(星期四) 下午5:08 > 收件人:nbsp;"godfrey he" > 主题:nbsp;pyflink1.11.0window > > > > 你好: > nbsp; nbsp;我在使用pyflink1.11版本时,window开窗仍会报错 > : org.apache.flink.table.api.ValidationException: A group window > expects a > time attribute for grouping in a stream
Re: Re: flink1.10升级到flink1.11 提交到yarn失败
Hi, 看样子是kafka table source没有成功创建,也许你需要将 org.apache.flink flink-sql-connector-kafka_2.11 ${flink.version} 这个jar 放到 FLINK_HOME/lib 目录下 Congxian Qiu 于2020年7月10日周五 上午10:57写道: > Hi > > 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。 > > PS 从栈那里看到是 csv 相关的,可以优先考虑下 cvs 相关的包 > > ``` > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.filesystem.FileSystemTableFactory > at > > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > at > > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > at > > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > at > > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) > ... 37 more > ``` > > [1] http://apache-flink.147419.n8.nabble.com/flink-1-11-td4471.html > Best, > Congxian > > > Zhou Zach 于2020年7月10日周五 上午10:39写道: > > > 日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。 > > > > > > 我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常: > > > > > > SLF4J: Class path contains multiple SLF4J bindings. > > SLF4J: Found binding in > > > [jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: Found binding in > > > [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > > explanation. > > SLF4J: Actual binding is of type > > [org.apache.logging.slf4j.Log4jLoggerFactory] > > > > > > > > The program finished with the following exception: > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method caused an error: findAndCreateTableSource failed. > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > > at > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > > at > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > > at > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > > at > > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > Caused by: org.apache.flink.table.api.TableException: > > findAndCreateTableSource failed. > > at > > > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49) > > at > > > org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190) > > at > > > org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:89) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > > at > > > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > > at > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > > at > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > > at > > >
Re: pyflink1.11.0window
琴师你好, 你的source ddl里有指定time1为 time attribute吗? create table source1( id int, time1 timestamp, type string, WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND ) with (...) 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月10日周五 上午8:43写道: > --原始邮件-- > 发件人: > "奇怪的不朽琴师" > < > 1129656...@qq.com; > 发送时间:2020年7月9日(星期四) 下午5:08 > 收件人:"godfrey he" > 主题:pyflink1.11.0window > > > > 你好: > 我在使用pyflink1.11版本时,window开窗仍会报错 > : org.apache.flink.table.api.ValidationException: A group window expects a > time attribute for grouping in a stream environment. > > 请问这个问题没有修复么?或者是我使用的方式不对,如果是使用不对,能提供一个正确的案例么? > 代码如下 > 谢谢 > > > def from_kafka_to_kafka_demo(): > s_env = > StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > > > # use blink table planner > st_env = StreamTableEnvironment.create(s_env) > > > # register source and sink > register_rides_source(st_env) > register_rides_sink(st_env) > > > st_env.from_path("source1")\ > > .window(Tumble.over("1.secends").on("time1").alias("w")) \ > .group_by("w") \ > .select(" id, time1 , time1 ")\ > .insert_into("sink1") > > st_env.execute("2-from_kafka_to_kafka") > > > > > def register_rides_source(st_env): > source_ddl = \ > ''' > create table source1( > id int, > time1 timestamp, > type string > ) with ( > 'connector.type' = 'kafka', > 'update-mode' = 'append', > 'connector.topic' = 'tp1', > 'connector.properties.bootstrap.servers' = 'localhost:9092' > ) > ''' > st_env.sql_update(source_ddl) > > > > > def register_rides_sink(st_env): > sink_ddl = \ > ''' > create table sink1( > id int, > time1 timestamp, > time2 timestamp > ) with ( > 'connector.type' = 'kafka', > 'update-mode' = 'append', > 'connector.topic' = 'tp3', > 'connector.properties.bootstrap.servers' = 'localhost:9092' > ) > ''' > st_env.sql_update(sink_ddl) > > > > > if __name__ == '__main__': > from_kafka_to_kafka_demo() > > >