Re: 提交flink作业抛 java.lang.LinkageError

2021-11-07 文章 Shuiqiang Chen
Hi,

能检查下作业jar里 kafka client的版本和平台上的是否一致吗?

casel.chen  于2021年11月5日周五 下午11:25写道:

> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>
>
> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> caused an error:
>
>
>
>
>
>
> Classpath:
> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>
>
>
>
>
>
> System.out: (none)
>
>
>
>
>
>
> System.err: (none)
>
>
> at
> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>
>
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>
>
> at
> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>
>
> at
> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>
>
> at
> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>
>
> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>
>
> at
> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>
>
> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>
>
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>
>
> at
> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>
>
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>
>
> at
> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>
>
> at
> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>
>
> at
> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
> at java.lang.Thread.run(Thread.java:748)
>
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
> initiated loading for a different type with name
> "org/apache/kafka/clients/consumer/ConsumerRecord"
>
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>
>
> at java.security.AccessController.doPrivileged(Native Method)


Re: flink-yarn的pre-job模式

2021-10-26 文章 Shuiqiang Chen
你好,

上传的图片无法加载。 这种情况是 yarn 无法提供拉起taskmanager,检查下yarn资源是否充足?

王健 <13166339...@163.com> 于2021年10月26日周二 下午7:50写道:

> 您好:
>   我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。
>
>  1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c
> com.worktrans.flink.wj.ods.FlinkCDC01 /usr/local/flink-1.13.2/flink_x.jar
>  提交正常,如图:
>
>  2.yarn 截图
>
>
> 3.flink截图:
>   现象:taskslot和taskmanager数量都为0,一直在申请
>
>
>  4.最后结果:报错如下
> 2021-10-25 16:17:49
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)
> at
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)
> at
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> ... 26 more
> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> 30 ms
> ... 27 more
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Pyflink 提交到本地集群报错

2021-03-09 文章 Shuiqiang Chen
Huilin 你好,

你用的是哪个版本的Flink呢?

Huilin_WU <592900...@qq.com> 于2021年3月10日周三 上午9:39写道:

> 我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py
> xx.py就会报上面的错误说没有pyflink的组件。
> (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m
> localhost:8081 -py demo_predict.py
> Traceback (most recent call last):
>   File "demo_predict.py", line 51, in 
> from pyflink.common.serialization import SimpleStringEncoder
> ModuleNotFoundError: No module named 'pyflink.common.serialization'
>
> 我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 问题求助(Pyflink)

2021-01-30 文章 Shuiqiang Chen
抱歉,漏了文档链接
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#enabling-kerberos-authentication

Shuiqiang Chen  于2021年1月30日周六 下午4:32写道:

> Hi,
> 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 
> security.kerberos.login.principal这两个属性了吗?
> 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗?
>
> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道:
>
>> 老师,您好,
>> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下:
>> Caused by: java.lang.IllegalArgumentException: Could not find a
>> 'KafkaClient' entry in the JAAS configuration. System property
>> 'java.security.auth.login.config' is not set  at
>> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
>> 2)这是Flink配置里的jaas.conf
>> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf
>> 3.1)这是spark配置里的jaas.conf
>>
>> 3.2)这是spark配置里的jaas-zk.conf
>>
>>
>> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。
>>
>>
>> -- 原始邮件 --
>> *发件人:* "user-zh" ;
>> *发送时间:* 2021年1月30日(星期六) 下午3:49
>> *收件人:* "user-zh";
>> *主题:* Re: 问题求助(Pyflink)
>>
>> 你好,
>> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
>> partition相关meta信息和认证相关是否成功的信息。
>>
>> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道:
>>
>> > 老师,你好,消费是没有任何问题,可以正常消费。
>> >
>> >
>> >
>> >
>> > -- 原始邮件 --
>> > *发件人:* "user-zh" ;
>> > *发送时间:* 2021年1月30日(星期六) 下午3:08
>> > *收件人:* "user-zh";
>> > *主题:* Re:问题求助(Pyflink)
>> >
>> > 先看下kafka能否通过命令行消费数据.
>> >
>> > 命令行检查确保能消费,再使用Flink.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道:
>> >
>> >
>> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
>> > 存在问题,希望老师能够给解决疑惑。
>> > 1)Kafka生产数据:
>> >
>> > 2)pyflink 程序
>> >
>> >
>> > #!/usr/bin/python3.7
>> > # -*- coding: UTF-8 -*-
>> > from pyflink.datastream import StreamExecutionEnvironment,
>> > CheckpointingMode
>> > from pyflink.table import StreamTableEnvironment, TableConfig,
>> DataTypes,
>> > CsvTableSink, WriteMode, SqlDialect
>> > from pyflink.table.descriptors import
>> FileSystem,OldCsv,Schema,Kafka,Json
>> > s_env = StreamExecutionEnvironment.get_execution_environment()
>> > s_env.set_parallelism(1)
>> > s_env.enable_checkpointing(3000)
>> > st_env = StreamTableEnvironment.create(s_env, TableConfig())
>> > st_env.use_catalog("default_catalog")
>> > st_env.use_database("default_database")
>> >
>> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
>> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002
>> ").property("security.protocol",
>> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
>> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com
>> ').property("bootstrap.servers",
>> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007
>> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
>> > DataTypes.BIGINT()),DataTypes.FIELD("name",
>> > DataTypes.STRING())]))).with_schema(Schema().field("id",
>> > DataTypes.BIGINT()).field("name",
>> > DataTypes.STRING())).register_table_source("sourceKafka")
>> > fieldNames = ["id", "name"]
>> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
>> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv",
>> ",",
>> > 1, WriteMode.OVERWRITE)
>> > st_env.register_table_sink("csvTableSink", csvSink)
>> > resultQuery = st_env.sql_query("select id,name from sourceKafka")
>> > resultQuery = resultQuery.insert_into("csvTableSink")
>> > st_env.execute("pyflink-kafka-v2")
>> > 3)pyflink-shell.sh local
>> >
>> > 4)运行结果
>> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
>> >
>> >
>> >
>> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
>> >
>> >
>> >
>> >
>>
>


Re: 问题求助(Pyflink)

2021-01-30 文章 Shuiqiang Chen
Hi,
按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和
security.kerberos.login.principal这两个属性了吗?
还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗?

瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道:

> 老师,您好,
> 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下:
> Caused by: java.lang.IllegalArgumentException: Could not find a
> 'KafkaClient' entry in the JAAS configuration. System property
> 'java.security.auth.login.config' is not set  at
> org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
> 2)这是Flink配置里的jaas.conf
> 3)pyspark对接kafka写hdfs没有问题,这是spark的jaas.conf
> 3.1)这是spark配置里的jaas.conf
>
> 3.2)这是spark配置里的jaas-zk.conf
>
>
> 是不是要修改这个文件呢?百度给出的都是java的修正,不知道这个需要如何修改。
>
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月30日(星期六) 下午3:49
> *收件人:* "user-zh";
> *主题:* Re: 问题求助(Pyflink)
>
> 你好,
> 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
> partition相关meta信息和认证相关是否成功的信息。
>
> 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道:
>
> > 老师,你好,消费是没有任何问题,可以正常消费。
> >
> >
> >
> >
> > -- 原始邮件 --
> > *发件人:* "user-zh" ;
> > *发送时间:* 2021年1月30日(星期六) 下午3:08
> > *收件人:* "user-zh";
> > *主题:* Re:问题求助(Pyflink)
> >
> > 先看下kafka能否通过命令行消费数据.
> >
> > 命令行检查确保能消费,再使用Flink.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道:
> >
> >
> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
> > 存在问题,希望老师能够给解决疑惑。
> > 1)Kafka生产数据:
> >
> > 2)pyflink 程序
> >
> >
> > #!/usr/bin/python3.7
> > # -*- coding: UTF-8 -*-
> > from pyflink.datastream import StreamExecutionEnvironment,
> > CheckpointingMode
> > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes,
> > CsvTableSink, WriteMode, SqlDialect
> > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> > s_env = StreamExecutionEnvironment.get_execution_environment()
> > s_env.set_parallelism(1)
> > s_env.enable_checkpointing(3000)
> > st_env = StreamTableEnvironment.create(s_env, TableConfig())
> > st_env.use_catalog("default_catalog")
> > st_env.use_database("default_database")
> >
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
> > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002
> ").property("security.protocol",
> > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
> > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com
> ').property("bootstrap.servers",
> > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007
> ")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
> > DataTypes.BIGINT()),DataTypes.FIELD("name",
> > DataTypes.STRING())]))).with_schema(Schema().field("id",
> > DataTypes.BIGINT()).field("name",
> > DataTypes.STRING())).register_table_source("sourceKafka")
> > fieldNames = ["id", "name"]
> > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv",
> ",",
> > 1, WriteMode.OVERWRITE)
> > st_env.register_table_sink("csvTableSink", csvSink)
> > resultQuery = st_env.sql_query("select id,name from sourceKafka")
> > resultQuery = resultQuery.insert_into("csvTableSink")
> > st_env.execute("pyflink-kafka-v2")
> > 3)pyflink-shell.sh local
> >
> > 4)运行结果
> > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
> >
> >
> >
> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
> >
> >
> >
> >
>


Re: 问题求助(Pyflink)

2021-01-29 文章 Shuiqiang Chen
你好,
可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka
partition相关meta信息和认证相关是否成功的信息。

瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道:

> 老师,你好,消费是没有任何问题,可以正常消费。
>
>
>
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月30日(星期六) 下午3:08
> *收件人:* "user-zh";
> *主题:* Re:问题求助(Pyflink)
>
> 先看下kafka能否通过命令行消费数据.
>
> 命令行检查确保能消费,再使用Flink.
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-30 14:25:57,"瞿叶奇" <389243...@qq.com> 写道:
>
> 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka
> 存在问题,希望老师能够给解决疑惑。
> 1)Kafka生产数据:
>
> 2)pyflink 程序
>
>
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment,
> CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes,
> CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3000)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect",
> "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol",
> 'SASL_PLAINTEXT').property("sasl.kerberos.service.name",
> 'kafka').property("kerberos.domain.name", 
> 'hadoop.hadoop.com').property("bootstrap.servers",
> "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id",
> DataTypes.BIGINT()),DataTypes.FIELD("name",
> DataTypes.STRING())]))).with_schema(Schema().field("id",
> DataTypes.BIGINT()).field("name",
> DataTypes.STRING())).register_table_source("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",",
> 1, WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v2")
> 3)pyflink-shell.sh local
>
> 4)运行结果
> 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下:
>
>
> 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。
>
>
>
>


Re: FlinkKafkaConsumer问题

2020-09-03 文章 Shuiqiang Chen
Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 
offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 
的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 
partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

> 在 2020年9月4日,上午10:25,op <520075...@qq.com> 写道:
> 
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年9月3日(星期四) 晚上6:09
> 收件人:"user-zh" 
> 主题:Re: FlinkKafkaConsumer问题
> 
> 
> 
> Hi op,
> 
> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 
> partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 
> Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
> 
> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit 
> offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
> 
>  在 2020年9月3日,下午3:03,op <520075...@qq.com 写道:
>  
>  nbsp; nbsp; hi,nbsp; nbsp; 
> 我对FlinkKafkaConsumer的实现有点迷惑,nbsp; nbsp; 这有两个相同代码的程序:
>  //---
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
>  
>  val consumer = new FlinkKafkaConsumer[String](topic,new 
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
>  
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka
>  的consumer 
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
>  谢谢



Re: FlinkKafkaConsumer问题

2020-09-03 文章 Shuiqiang Chen
Hi op,

在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 
信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 
Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。

另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 
到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。

> 在 2020年9月3日,下午3:03,op <520075...@qq.com> 写道:
> 
>   hi,  我对FlinkKafkaConsumer的实现有点迷惑,  
> 这有两个相同代码的程序:
> //---
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> Env.setRestartStrategy(RestartStrategies.noRestart())
> val consumerProps = new Properties()
> consumerProps.put("bootstrap.servers", brokers)
> consumerProps.put("group.id", "test1234")
> 
> val consumer = new FlinkKafkaConsumer[String](topic,new 
> KafkaStringSchema,consumerProps).setStartFromLatest()
> Env.addSource(consumer).print()
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka
>  的consumer 
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
>  谢谢



Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 文章 Shuiqiang Chen
Hi jincheng,

Thanks for the discussion. +1 for the FLIP.

A well-organized documentation will greatly improve the efficiency and
experience for developers.

Best,
Shuiqiang

Hequn Cheng  于2020年8月1日周六 上午8:42写道:

> Hi Jincheng,
>
> Thanks a lot for raising the discussion. +1 for the FLIP.
>
> I think this will bring big benefits for the PyFlink users. Currently, the
> Python TableAPI document is hidden deeply under the TableAPI tab which
> makes it quite unreadable. Also, the PyFlink documentation is mixed with
> Java/Scala documentation. It is hard for users to have an overview of all
> the PyFlink documents. As more and more functionalities are added into
> PyFlink, I think it's time for us to refactor the document.
>
> Best,
> Hequn
>
>
> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
> wrote:
>
>> Hi, Jincheng!
>>
>> Thanks for creating this detailed FLIP, it will make a big difference in
>> the experience of Python developers using Flink. I'm interested in
>> contributing to this work, so I'll reach out to you offline!
>>
>> Also, thanks for sharing some information on the adoption of PyFlink, it's
>> great to see that there are already production users.
>>
>> Marta
>>
>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>>
>> > Hi Jincheng,
>> >
>> > Thanks a lot for bringing up this discussion and the proposal.
>> >
>> > Big +1 for improving the structure of PyFlink doc.
>> >
>> > It will be very friendly to give PyFlink users a unified entrance to
>> learn
>> > PyFlink documents.
>> >
>> > Best,
>> > Xingbo
>> >
>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>> >
>> >> Hi Jincheng,
>> >>
>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
>> >> improve the Python API doc.
>> >>
>> >> I have received many feedbacks from PyFlink beginners about
>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>> mixed
>> >> with the Java doc and it's not easy to find the docs he wants to know.
>> >>
>> >> I think it would greatly improve the user experience if we can have one
>> >> place which includes most knowledges PyFlink users should know.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>> >>
>> >> Hi folks,
>> >>
>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>> grow.
>> >> As far as I know there are many companies have used PyFlink for data
>> >> analysis, operation and maintenance monitoring business has been put
>> into
>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
>> to
>> >> the feedback we received, current documentation is not very friendly to
>> >> PyFlink users. There are two shortcomings:
>> >>
>> >> - Python related content is mixed in the Java/Scala documentation,
>> which
>> >> makes it difficult for users who only focus on PyFlink to read.
>> >> - There is already a "Python Table API" section in the Table API
>> document
>> >> to store PyFlink documents, but the number of articles is small and the
>> >> content is fragmented. It is difficult for beginners to learn from it.
>> >>
>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
>> >> documents will be added for those new APIs. In order to increase the
>> >> readability and maintainability of the PyFlink document, Wei Zhong and
>> me
>> >> have discussed offline and would like to rework it via this FLIP.
>> >>
>> >> We will rework the document around the following three objectives:
>> >>
>> >> - Add a separate section for Python API under the "Application
>> >> Development" section.
>> >> - Restructure current Python documentation to a brand new structure to
>> >> ensure complete content and friendly to beginners.
>> >> - Improve the documents shared by Python/Java/Scala to make it more
>> >> friendly to Python users and without affecting Java/Scala users.
>> >>
>> >> More detail can be found in the FLIP-133:
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>> >>
>> >>
>> >>
>>
>


Re: flink1.11启动问题

2020-07-21 文章 Shuiqiang Chen
Hi,

可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源

Best,
Shuiqiang

酷酷的浑蛋  于2020年7月21日周二 下午4:37写道:

>
>
> 服了啊,这个flink1.11启动怎么净是问题啊
>
>
> 我1.7,1.8,1.9 都没有问题,到11就不行
> ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm
> 1024 -ynm sql_test ./examples/batch/WordCount.jar --input
> hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a
>
>
> 报错:
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources. at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ... 45 more Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> ... 25 more
>
>
>
> 我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),
>
>


Re: pyflink1.11.0window

2020-07-20 文章 Shuiqiang Chen
看看异常信息, 是不是你的insert mode没配置对。
BTW, 你粘贴的文本带有很多"", 有点影响可读性。

Best,
Shuiqiang

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月20日周一 下午4:23写道:

> HI :
>   我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下
> Traceback (most recent call last):
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 147, in deco
>   return f(*a, **kw)
>  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>   format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
> : org.apache.flink.table.api.TableException: AppendStreamTableSink
> requires that Table has only insert changes.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> During handling of the above exception, another exception occurred:
>
>
> Traceback (most recent call last):
>  File "tou.py", line 99, infrom_kafka_to_kafka_demo()
>  File "tou.py", line 33, in from_kafka_to_kafka_demo
>   st_env.sql_update("insert into flink_result select
> id,type,rowtime from final_result2")
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
> line 547, in sql_update
>   self._j_tenv.sqlUpdate(stmt)
>  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>   answer, self.gateway_client, self.target_id, self.name)
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 154, in deco
>   raise exception_mapping[exception](s.split(': ', 1)[1],
> stack_trace)
> pyflink.util.exceptions.TableException: 'AppendStreamTableSink requires
> that Table has only insert changes.'
>
>
>
>
>
> 这种应该如何实现,需求大概是一个流表(需要分组汇总)关联一个维表。
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes,
> EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
> from pyflink.table.window import Tumble
>
>
>
>
> def from_kafka_to_kafka_demo():
>
>
>   # use blink table planner
>   env = StreamExecutionEnvironment.get_execution_environment()
>  
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>   env_settings =
> 

Re: pyflink1.11.0window

2020-07-15 文章 Shuiqiang Chen
下面这个例子从kafka读取json格式的数据, 然后做窗口聚合后写入es, 可以参考下代码结构, 修改相应数据字段。 这份代码你本地应该是不能运行的

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING())
def platform_code_to_name(code):
return "mobile" if code == 0 else "pc"


def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)

source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
rt as TO_TIMESTAMP(createTime),
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
paySource INT,
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'payment_msg_2',
  'connector.properties.bootstrap.servers' = '0.0.0.0:9092',
  'connector.properties.group.id' = 'test_3',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)

es_sink_ddl = """
CREATE TABLE es_sink (
platform VARCHAR,
pay_amount DOUBLE,
rowtime TIMESTAMP(3)
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'platform_pay_amount_1',
'connector.document-type' = 'payment',
'update-mode' = 'upsert',
'connector.flush-on-checkpoint' = 'true',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.max-size' = '42mb',
'connector.bulk-flush.max-actions' = '32',
'connector.bulk-flush.interval' = '1000',
'connector.bulk-flush.backoff.delay' = '1000',
'format.type' = 'json'
)
"""

t_env.sql_update(es_sink_ddl)

t_env.register_function('platformcodetoname', platform_code_to_name)

query = """
select platformcodetoname(payPlatform) as platform, sum(payAmount)
as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT)
as rowtime
from payment_msg
group by tumble(rt, interval '5' seconds), payPlatform
"""

count_result = t_env.sql_query(query)

t_env.create_temporary_view('windowed_values', count_result)

query2 = """
select platform, last_value(pay_amount), rowtime from
windowed_values group by platform, rowtime
"""

final_result = t_env.sql_query(query2)

final_result.execute_insert(table_path='es_sink')


if __name__ == '__main__':
log_processing()


奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 下午4:40写道:

> Shuiqiang,你好:
>  
> hi,能否请求您贡献一下完整的代码的案例,我是初学者,官网的2-from_kafka_to_kafka.py这个没有窗口,我现在想要一个在此基础上有窗口的demo,尝试编了很久也未能解决。我在给这个demo加上窗口功能后总是有各种各样的问题,十分痛苦,如能帮助,感激不尽。
>
>
> 恳请所有看到此封邮件的大佬!
>
>
> 谢谢!
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月15日(星期三) 中午11:25
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 举个sql例子
> select platformcodetoname(payPlatform) as platform, sum(payAmount) as
> pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as
> rowtime
> from payment_msg group by tumble(rt, interval '5' seconds), payPlatform
> 这个query 对每5s的tumble窗口做统计。
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午11:10写道:
>
>  Shuiqiang,你好:
>  nbsp;
> nbsp;我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window?
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  acqua@gmail.comgt;;
>  发送时间:nbsp;2020年7月15日(星期三) 上午10:51
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: pyflink1.11.0window
> 
> 
> 
>  琴师你好,
>  异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
>  expects a size value literal.
>  看起来是接下tumble window定义的代码不太正确吧
> 
>  Best,
>  Shuiqiang
> 
>  奇怪的不朽琴师 <1129656...@qq.comgt; 于2020年7月15日周三 上午10:27写道:
> 
>  gt; 你好:
>  gt; amp;nbsp; amp;nbsp;
>  gt;
> 
> amp;nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
>  gt; Traceback (most recent call last):
>  gt; amp;nbsp; File "tou.py", line 71, in   gt; amp;nbsp; amp;nbsp; from_kafka_to_kafka_demo()
>  gt; amp;nbsp; File "tou.py", line 21, in
> from_kafka_to_kafka_demo
>  gt; 

Re: pyflink1.11.0window

2020-07-14 文章 Shuiqiang Chen
举个sql例子
select platformcodetoname(payPlatform) as platform, sum(payAmount) as
pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as
rowtime
from payment_msg group by tumble(rt, interval '5' seconds), payPlatform
这个query 对每5s的tumble窗口做统计。

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午11:10写道:

> Shuiqiang,你好:
>  我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window?
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月15日(星期三) 上午10:51
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 琴师你好,
> 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
> expects a size value literal.
> 看起来是接下tumble window定义的代码不太正确吧
>
> Best,
> Shuiqiang
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午10:27写道:
>
>  你好:
>  nbsp; nbsp;
> 
> nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
>  Traceback (most recent call last):
>  nbsp; File "tou.py", line 71, in   nbsp; nbsp; from_kafka_to_kafka_demo()
>  nbsp; File "tou.py", line 21, in from_kafka_to_kafka_demo
>  nbsp; nbsp; .select(" id,nbsp; time1 , time1 ")\
>  nbsp; File
>  "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line
> 907,
>  in select
>  nbsp; nbsp; return Table(self._j_table.select(fields),
> self._t_env)
>  nbsp; File
> "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
>  nbsp; nbsp; answer, self.gateway_client, self.target_id,
> self.name)
>  nbsp; File
>  "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line
>  147, in deco
>  nbsp; nbsp; return f(*a, **kw)
>  nbsp; File
> "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
>  nbsp; nbsp; format(target_id, ".", name), value)
>  py4j.protocol.Py4JJavaError: An error occurred while calling
> o26.select.
>  : org.apache.flink.table.api.ValidationException: A tumble window
> expects
>  a size value literal.
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
>  nbsp; nbsp; nbsp; nbsp; at
>  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  nbsp; nbsp; nbsp; nbsp; at
>  java.lang.reflect.Method.invoke(Method.java:498)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  nbsp; nbsp; nbsp; nbsp; at
> java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  def register_rides_source(st_env):
>  nbsp; nbsp; source_ddl = \
>  nbsp; nbsp; """
>  nbsp; nbsp; create table source1(
>  nbsp; nbsp; nbsp;id int,
>  nbsp; nbsp; nbsp;time1 timestamp,
>  nbsp; nbsp; nbsp;type string,
>  nbsp; nbsp; nbsp;WATERMARK FOR time1 as time1 -
> INTERVAL '2' SECOND
>  nbsp; nbsp; nbsp;) with (
>  nbsp; nbsp; 'connector.type' = 'kafka',
>  nbsp; nbsp; 'update-mode' = 'append',
>  nbsp; nbsp; 'connector.topic' = 'tp1',
>  nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> 'localhost:9092',
>  nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> 'localhost:2181',
>  nbsp; nbsp; 'format.type' = 'json',
>  nbsp; nbsp; 'format.derive-schema' = 'true',
>  nbsp; nbsp; 'connector.version' = 

Re: pyflink1.11.0window

2020-07-14 文章 Shuiqiang Chen
琴师你好,
异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
expects a size value literal.
看起来是接下tumble window定义的代码不太正确吧

Best,
Shuiqiang

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午10:27写道:

> 你好:
>  
> 我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
> Traceback (most recent call last):
>  File "tou.py", line 71, infrom_kafka_to_kafka_demo()
>  File "tou.py", line 21, in from_kafka_to_kafka_demo
>   .select(" id, time1 , time1 ")\
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line 907,
> in select
>   return Table(self._j_table.select(fields), self._t_env)
>  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>   answer, self.gateway_client, self.target_id, self.name)
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 147, in deco
>   return f(*a, **kw)
>  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>   format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o26.select.
> : org.apache.flink.table.api.ValidationException: A tumble window expects
> a size value literal.
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> def register_rides_source(st_env):
>   source_ddl = \
>   """
>   create table source1(
>   id int,
>   time1 timestamp,
>   type string,
>   WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
>   ) with (
>   'connector.type' = 'kafka',
>   'update-mode' = 'append',
>   'connector.topic' = 'tp1',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true',
>   'connector.version' = 'universal'
>   )
>   """
>   st_env.sql_update(source_ddl)
>
>  
>   s_env =
> StreamExecutionEnvironment.get_execution_environment()
>   s_env.set_parallelism(1)
>
>
>   st_env = StreamTableEnvironment.create(s_env)
>
>
>   register_rides_source(st_env)
>   register_rides_sink(st_env)
>
>
>   st_env.from_path("source1")\
>
> .window(Tumble.over("2.secends").on("time1").alias("w")) \
> .group_by("w") \
> .select(" id, time1 , time1 ")\
> .insert_into("sink1")
>  
>   st_env.execute("2-from_kafka_to_kafka")
>
>
> 代码如上
>
>
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月10日(星期五) 上午9:17
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 琴师你好,
>
> 你的source ddl里有指定time1为 time attribute吗?
> create table source1(
>  id int,
>  time1 timestamp,
>  type string,
>  WATERMARK FOR time1 as time1 -
> INTERVAL '2' SECOND
> ) with (...)
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月10日周五 上午8:43写道:
>
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "奇怪的不朽琴师"
> 
> <
>  1129656...@qq.comgt;;
>  发送时间:nbsp;2020年7月9日(星期四) 下午5:08
>  收件人:nbsp;"godfrey he" 
>  主题:nbsp;pyflink1.11.0window
> 
> 
> 
>  你好:
>  nbsp; nbsp;我在使用pyflink1.11版本时,window开窗仍会报错
>  : org.apache.flink.table.api.ValidationException: A group window
> expects a
>  time attribute for grouping in a stream 

Re: Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Shuiqiang Chen
Hi,
看样子是kafka table source没有成功创建,也许你需要将

org.apache.flink
flink-sql-connector-kafka_2.11
${flink.version}
 

这个jar 放到 FLINK_HOME/lib 目录下

Congxian Qiu  于2020年7月10日周五 上午10:57写道:

> Hi
>
> 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。
>
> PS 从栈那里看到是 csv 相关的,可以优先考虑下 cvs 相关的包
>
> ```
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.filesystem.FileSystemTableFactory
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
> ... 37 more
> ```
>
> [1] http://apache-flink.147419.n8.nabble.com/flink-1-11-td4471.html
> Best,
> Congxian
>
>
> Zhou Zach  于2020年7月10日周五 上午10:39写道:
>
> > 日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。
> >
> >
> > 我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常:
> >
> >
> > SLF4J: Class path contains multiple SLF4J bindings.
> > SLF4J: Found binding in
> >
> [jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > explanation.
> > SLF4J: Actual binding is of type
> > [org.apache.logging.slf4j.Log4jLoggerFactory]
> >
> >
> > 
> >  The program finished with the following exception:
> >
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: findAndCreateTableSource failed.
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> > at
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: org.apache.flink.table.api.TableException:
> > findAndCreateTableSource failed.
> > at
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
> > at
> >
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)
> > at
> >
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:89)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> > at
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> > at
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> > at
> >
> 

Re: pyflink1.11.0window

2020-07-09 文章 Shuiqiang Chen
琴师你好,

你的source ddl里有指定time1为 time attribute吗?
create table source1(
id int,
time1 timestamp,
type string,
WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
) with (...)

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月10日周五 上午8:43写道:

> --原始邮件--
> 发件人:
>   "奇怪的不朽琴师"
> <
> 1129656...@qq.com;
> 发送时间:2020年7月9日(星期四) 下午5:08
> 收件人:"godfrey he"
> 主题:pyflink1.11.0window
>
>
>
> 你好:
>  我在使用pyflink1.11版本时,window开窗仍会报错
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
> 请问这个问题没有修复么?或者是我使用的方式不对,如果是使用不对,能提供一个正确的案例么?
> 代码如下
> 谢谢
>
>
> def from_kafka_to_kafka_demo():
>   s_env =
> StreamExecutionEnvironment.get_execution_environment()
>   s_env.set_parallelism(1)
>
>
>   # use blink table planner
>   st_env = StreamTableEnvironment.create(s_env)
>
>
>   # register source and sink
>   register_rides_source(st_env)
>   register_rides_sink(st_env)
>
>
>   st_env.from_path("source1")\
>
> .window(Tumble.over("1.secends").on("time1").alias("w")) \
> .group_by("w") \
> .select(" id, time1 , time1 ")\
> .insert_into("sink1")
>  
>   st_env.execute("2-from_kafka_to_kafka")
>
>
>
>
> def register_rides_source(st_env):
>   source_ddl = \
>   '''
>   create table source1(
> id int,
>   time1 timestamp,
>   type string
>   ) with (
>   'connector.type' = 'kafka',
>   'update-mode' = 'append',
>   'connector.topic' = 'tp1',
>   'connector.properties.bootstrap.servers' = 'localhost:9092'
>   )
>   '''
>   st_env.sql_update(source_ddl)
>
>
>
>
> def register_rides_sink(st_env):
>   sink_ddl = \
>   '''
>   create table sink1(
> id int,
>   time1 timestamp,
>   time2 timestamp
>   ) with (
>   'connector.type' = 'kafka',
>   'update-mode' = 'append',
>   'connector.topic' = 'tp3',
>   'connector.properties.bootstrap.servers' = 'localhost:9092'
>   )
>   '''
>   st_env.sql_update(sink_ddl)
>
>
>
>
> if __name__ == '__main__':
>   from_kafka_to_kafka_demo()
>
>
>