关于Table和DataStream相互转化的问题

2021-05-22 Thread lp
官网关于Table和DataStream相互转化部分: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#converting-between-datastream-and-table ①样例代码中, // interpret the insert-only Table as a DataStream again DataStream resultStream = tableEnv.toDataStream(resultTable);

FlinKCEP

2021-05-14 Thread lp
请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出 -- Sent from: http://apache-flink.147419.n8.nabble.com/

FlinkCEP 尽可能多的匹配的问题

2021-05-13 Thread lp
我有一个flinkCEP程序,采用eventTime,监控形如如下的数据 [13/May/2021:20:45:36 +0800] [13/May/2021:20:45:36 +0800] [13/May/2021:20:45:37 +0800] [13/May/2021:20:45:37 +0800] [13/May/2021:20:45:50 +0800] 程序中关键设置如下: 设置了水印延迟2s 跳过测略AfterMatchSkipStrategy.skipPastLastEvent() .times(3) .within(Time.seconds(3));

FlinkCEP Pattern匹配的问题

2021-05-10 Thread lp
我有一个flinkCEP的程序,检测nginx日志,假如同一ip,60s内超过3次访问,则报警。 我访问了7次,代号为1~7 检测到了4组报警分别是 [/1, /2, /3] [/2, /3, /4] [/3, /4, /5] [/4, /5, /6] 请问下,如果想之前已经参与过匹配的数据不再参与匹配,应该怎样做,比如其实我想得到两组报警: [/1, /2, /3] [/4, /5, /6] 如下是我检测的关键代码: Pattern pattern = Pattern.begin("start").times(3).within(Time.seconds(60)); --

Flink CDC 问题

2021-05-07 Thread lp
我最近正在研究flink Connector相关的内容,官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/;又了解到Flink CDC相关的概念:https://github.com/ververica/flink-cdc-connectors;请教一下flink Connector和Flink CDC二者之间是什么样的关系? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 侧输出流类型转换问题

2021-04-30 Thread lp
val outputTagDate = OutputTag[String]("Date-side-output") 你的outputtag定义的泛型是string ctx.output(outputTagDate,Date(first_retain,noob_endtime,noob_first_endtime)) 这个Date是个什么函数,返回的是string类型么 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flinkKafkaConsumer的offset提交的问题

2021-04-25 Thread lp
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true); 和kafka自己的"enable.auto.commit"=true【默认就是true, interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk? 看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下: /** * Specifies

Flink-kafka-connector Consumer配置警告

2021-04-18 Thread lp
flink1.12正常程序中,有如下告警: 19:38:37,557 WARN org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config. 我有一行如下配置:

flink-kafka-connector Producer.setWriteTimestampToKafka(true) 导致的问题

2021-04-16 Thread lp
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢? flinkKafkaProducer.setWriteTimestampToKafka(true); -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlatMap 报错Invalid timestamp: -1.

2021-04-16 Thread lp
查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1, 相关代码片段如下,请教是什么原因导致的呢? //sink Properties producerPro = new Properties(); producerPro.setProperty("bootstrap.servers",kafkaAddr);

FlatMap 报错Invalid timestamp: -1.

2021-04-16 Thread lp
程序一直正常运行,后来突然偶尔报错如下,显示flatMap的Collect时出错: 我的flatMap transform操作代码片段如下,收集的数据是来自kafka的topic -- SingleOutputStreamOperator text2Bean = consumerRecordDataStreamSource.flatMap(new FlatMapFunction() { @Override

flink kafka connector 偶现报错 Permission denied: connect

2021-04-07 Thread lp
我写了一个flink kafka connector的作业,从kafka topicA消费数据,做处理后,又sink回 topicB, 程序正常running中,偶现如下报错: java.net.SocketException: Permission denied: connect at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231] at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231] at

Re: flink sink kafka 报错

2021-04-07 Thread lp
中间还有这样的错误: 20:14:48,707 WARN org.apache.kafka.common.utils.AppInfoParser [] - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map -> Map -> Sink:

flink sink kafka 报错

2021-04-07 Thread lp
我写了一个stream程序,从kafka topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下: - 20:11:47,078 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink:

Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread lp
好的,谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread lp
如题,除了通过这种全局配置文件中的方式修改,能在程序中通过代码的方式修改吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread lp
我写了一个带状态的function 采用了如下cp配置: env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L); env.getCheckpointConfig().setCheckpointTimeout(6L);

flink dataStream多次sink DAG重复驱动执行?

2021-03-05 Thread lp
有个疑问, 如下程序片段: -- Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); properties.setProperty("group.id",kafkaOdsGroup); properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset);

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
谢谢! 我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的 的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html 。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。 仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下: ①flink on yarn模式下,jobmanager 和

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn ,ApplicationMaster对应的实现是啥? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-parcel使用求助

2021-02-02 Thread lp
flink-parcel是什么提交方式,能详细发下么。如果采用per-job mode 或者application mode ,各个job的flink 集群在yarn上是独立的,kill一个job并不会影响宁一个 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
flink on yarn中,yarn的applicationMaster和flink JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是 JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has finished, the ApplicationMaster (AM) is started.The JobManager and AM are running in the same container. Once they successfully started, the AM knows

Re: 提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 Thread lp
应该说是否:1.11和1.12这里这两种提交方式 是不是一样的,只不过命令有了变化? 官网中的摘录如下: flink-1.11: Run a single Flink job on YARN Example: ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar -- flink-1.12: Per-Job Cluster Mode Example: ./bin/flink run -t

提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 Thread lp
如题,在 ./flink --help中看到提交job的命令有两个相似的,貌似都会将jobManager发送yarn node上去之行,但不明白他们区别,官网也未找到他们的区别,请帮忙解释下他们之间的区别? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-27 Thread lp
谢答。查看我的pom.xml文件,和打包后的压缩包,确实包含kafka(org.apache.kafka.common)的相关依赖;所以我将相关的pom中的依赖都设置为provide,然后重新打包,并确认了我打好的jar包中不包含了任何kafka的依赖,发布运行,这次jobmanager直接报错:Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema

用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread lp
我写了一个 process function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-01-22 Thread lp
测试代码如下: -- public class Sink_KafkaSink_1{ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); String host =

flink sink到kafka,报错Failed to construct kafka producer

2021-01-21 Thread lp
flink1.11.2 自定义source循环产生数据然后sink到kafka 采用application Mode部署作业到yarn, jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下) 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to

Re: Application Mode部署作业到yarn,找不到.properties文件

2021-01-21 Thread lp
jobManager的完整报错日志如下: 2021-01-21 07:53:23,023 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 2021-01-21 07:53:23,027 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

Application Mode部署作业到yarn,找不到.properties文件

2021-01-21 Thread lp
flink processFunction程序,main()中采用ParameterTool读取resources文件夹下的pro.properties配置文件(kafka地址等);IDEA本地执行完全OK,maven打成jar包后,采用yarn application的方式部署作业, bin/flink run-application -t yarn-application /opt/quickstart-0.1.jar ;作业失败,查看yarn的container日志发现如下错误: Caused by:

Re: flink 空指针警告

2021-01-05 Thread lp
好的,谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 空指针警告

2021-01-05 Thread lp
operator操作:processWindowFunction的代码如下: class MyProcessWindowFuncation extends ProcessWindowFunction>, String, TimeWindow>{ private transient MapState> eveShareNoMaxPrice; private transient ValueState>> shareAndMaxPrice; @Override public void process(String s, Context

flink 空指针警告

2021-01-05 Thread lp
我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码: public static String timeStampToDate(Long timestamp){ ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("-MM-dd HH:mm:ss")); String format = threadLocal.get().format(new Date(timestamp));