关于Table和DataStream相互转化的问题

2021-05-22 Thread lp
官网关于Table和DataStream相互转化部分:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

①样例代码中,
// interpret the insert-only Table as a DataStream again
DataStream resultStream = tableEnv.toDataStream(resultTable);

但是查询了StreamTableEnvironment中没有toDataStream(Table resultTable);这个方法


②样例代码中:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/#emit-a-table

tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");


我的依赖中没有new Csv(),如果我要这样使用,需要额外的引入什么依赖包吗

谢谢!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinKCEP

2021-05-14 Thread lp
请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出



--
Sent from: http://apache-flink.147419.n8.nabble.com/

FlinkCEP 尽可能多的匹配的问题

2021-05-13 Thread lp
我有一个flinkCEP程序,采用eventTime,监控形如如下的数据
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:50 +0800]



程序中关键设置如下:
设置了水印延迟2s
跳过测略AfterMatchSkipStrategy.skipPastLastEvent()

.times(3)
.within(Time.seconds(3));



结果得到如下结果:
detected 3 access in 60s from same ip...[/45:36, /45:36, /45:37]
迟到输出的数据...[/45:37],发生超时的时间戳是::2021-05-13 08:45:40


其实我想得到结果是:
在[13/May/2021:20:45:50 +0800]这条数据到来时,我想得到这样的结果:detected 3 access in 60s from
same ip...[/45:36, /45:36, /45:37, /45:37]
;因为他们都满足我.times(3).within(Time.seconds(3))的设置;

所以我应该怎样做?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkCEP Pattern匹配的问题

2021-05-10 Thread lp
我有一个flinkCEP的程序,检测nginx日志,假如同一ip,60s内超过3次访问,则报警。
我访问了7次,代号为1~7
检测到了4组报警分别是
[/1, /2, /3]
[/2, /3, /4]
[/3, /4, /5]
[/4, /5, /6]

请问下,如果想之前已经参与过匹配的数据不再参与匹配,应该怎样做,比如其实我想得到两组报警:
[/1, /2, /3]
[/4, /5, /6]


如下是我检测的关键代码:
Pattern pattern =
Pattern.begin("start").times(3).within(Time.seconds(60));




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink CDC 问题

2021-05-07 Thread lp
我最近正在研究flink
Connector相关的内容,官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/;又了解到Flink
CDC相关的概念:https://github.com/ververica/flink-cdc-connectors;请教一下flink
Connector和Flink CDC二者之间是什么样的关系?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 侧输出流类型转换问题

2021-04-30 Thread lp
val outputTagDate = OutputTag[String]("Date-side-output")
你的outputtag定义的泛型是string
ctx.output(outputTagDate,Date(first_retain,noob_endtime,noob_first_endtime))
这个Date是个什么函数,返回的是string类型么



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinkKafkaConsumer的offset提交的问题

2021-04-25 Thread lp
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
和kafka自己的"enable.auto.commit"=true【默认就是true,
interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk?

看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下:

/**
 * Specifies whether or not the consumer should commit offsets back to
Kafka on checkpoints.
 *
 * This setting will only have effect if checkpointing is enabled for
the job. If
 * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8)
/ "enable.auto.commit"
 * (for 0.9+) property settings will be used.
 *
 * @return The consumer object, to allow function chaining.
 */

我的理解是:意思是如果enable了checkpoint,然后设置flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
(貌似默认就是true),就会采用checkpoint的interval去向kafka提交offset? 
,而不采用auto.commit.enable的配置?这样理解对么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink-kafka-connector Consumer配置警告

2021-04-18 Thread lp
flink1.12正常程序中,有如下告警:

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

我有一行如下配置:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);


根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
By default, partition discovery is disabled. To enable it, set a
non-negative value for flink.partition-discovery.interval-millis in the
provided properties config, representing the discovery interval in
milliseconds.


上述配置应该是合法的,但是为何会报如此警告呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-kafka-connector Producer.setWriteTimestampToKafka(true) 导致的问题

2021-04-16 Thread lp
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink
to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢?


flinkKafkaProducer.setWriteTimestampToKafka(true);




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlatMap 报错Invalid timestamp: -1.

2021-04-16 Thread lp
查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1,
相关代码片段如下,请教是什么原因导致的呢?

//sink
Properties producerPro = new Properties();
producerPro.setProperty("bootstrap.servers",kafkaAddr);
producerPro.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
"60");
   
FlinkKafkaProducer flinkKafkaProducer = new
FlinkKafkaProducer(dwdOsqueryDetailTopic, new SimpleStringSchema(),
producerPro, null, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5);

flinkKafkaProducer.setWriteTimestampToKafka(true);
beanStr.addSink(flinkKafkaProducer);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlatMap 报错Invalid timestamp: -1.

2021-04-16 Thread lp
程序一直正常运行,后来突然偶尔报错如下,显示flatMap的Collect时出错:
我的flatMap transform操作代码片段如下,收集的数据是来自kafka的topic
--
SingleOutputStreamOperator text2Bean =
consumerRecordDataStreamSource.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String jsonStr, Collector out)
throws Exception {
OsqueryBean osqueryBean =
JSON.parseObject(jsonStr,OsqueryBean.class);
if (StringUtils.isNotEmpty((String)
Utils.mapGet(osqueryBean, "columns", "cmdline"))) {
out.collect(osqueryBean);
}
}
});
--
13:59:53,885 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  
[] - Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (5/8)
(171fc1965c9c20a35cb48588cd88b35f) switched from RUNNING to FAILED on
d0468f82-70e8-4b65-99f2-315466cd15cd @ 127.0.0.1 (dataPort=-1).
java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should
always be non-negative or null.
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:74)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:97)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper.serialize(KafkaSerializationSchemaWrapper.java:86)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:83)
~[classes/:?]
at
org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:78)
~[classes/:?]
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at

flink kafka connector 偶现报错 Permission denied: connect

2021-04-07 Thread lp
我写了一个flink kafka connector的作业,从kafka topicA消费数据,做处理后,又sink回 topicB,
程序正常running中,偶现如下报错:


java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:321)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
10:23:15,951 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-5, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-5] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:321)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
10:23:15,953 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-2, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-2] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at

Re: flink sink kafka 报错

2021-04-07 Thread lp
中间还有这样的错误:


20:14:48,707 WARN  org.apache.kafka.common.utils.AppInfoParser 
[] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map
-> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-8"
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
~[?:1.8.0_231]
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:1.8.0_231]
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:426)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1282)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
~[?:1.8.0_231]
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
~[?:1.8.0_231]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_231]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
~[?:1.8.0_231]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
~[?:1.8.0_231]
at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
~[?:1.8.0_231]
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
~[?:1.8.0_231]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
~[?:1.8.0_231]
at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
~[?:1.8.0_231]
at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
~[?:1.8.0_231]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.cleanUpUserContext(FlinkKafkaProducer.java:1249)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.finishRecoveringContext(FlinkKafkaProducer.java:1224)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:380)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at

flink sink kafka 报错

2021-04-07 Thread lp
我写了一个stream程序,从kafka
topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下:
-
20:11:47,078 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,079 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,079 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,066 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:529)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
20:11:47,081 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Bootstrap broker
10.66.0.129:9092 (id: -1 rack: null) disconnected
20:11:47,081 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,081 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,084 INFO 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl []
- Could not complete snapshot 28 for operator Source: Custom Source -> Flat
Map -> Map -> Sink: Unnamed (5/8)#0. Failure reason: Checkpoint was
declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 28 for operator Source: Custom Source -> Flat Map -> Map -> Sink:

Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread lp
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread lp
如题,除了通过这种全局配置文件中的方式修改,能在程序中通过代码的方式修改吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread lp
我写了一个带状态的function
采用了如下cp配置:
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
env.getCheckpointConfig().setCheckpointTimeout(6L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///cp/eventCounter"));

请教几个问题:
①按照官网的介绍,目录数据应该是这样的
/user-defined-checkpoint-dir
/{job-id}
|
+ --shared/
+ --taskowned/
+ --chk-1/
+ --chk-2/
+ --chk-3/
...

但是我的测试是job正常running时,chk-*永远只有一个,每次做chk,递增 +1 一次


②状态数据按照理解是保存在chk-*下面的,但是我的测试下面只有一个_metadata,并没有每次chk的数据,使用的flink1.12
当我改成使用flink1.8时,是可以看到如下chk-*目录下除了_metadata,还有每次的chk数据.
所以flink1.12高版本的情况每次chk的数据在哪里


③按照官网介绍,默认只保留最新的一份chk数据,如果想保留最近的多份,除了全局flink-conf.yaml中配置state.checkpoints.num-retained:
5, 有程序中使用env 针对每job的配置方式吗





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink dataStream多次sink DAG重复驱动执行?

2021-03-05 Thread lp


有个疑问,
如下程序片段:

--
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);
properties.setProperty("group.id",kafkaOdsGroup);
properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset);
   
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,kafkaOdsPartitionDiscoverInterval);
   
properties.setProperty("transaction.timeout.ms",KafkaOdsTransactionTimeout);//kafka事务超时时间

FlinkKafkaConsumer flinkKafkaConsumer = new
FlinkKafkaConsumer<>(kafkaOdsTopic,new SimpleStringSchema(),properties);
DataStreamSource dataStreamSource =
env.addSource(flinkKafkaConsumer);
dataStreamSource.printToErr("1");
dataStreamSource.printToErr("2");
dataStreamSource.printToErr("3");



我对一个datastream进行多次相同操作的sink,请问是否会导致上游整个DAG重复驱动执行,基于spark的惯性思维,我认为上游DAG是会重复驱动执行的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
谢谢!
我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的
的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
 
。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。
仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下:
①flink on yarn模式下,jobmanager 和
appLicationMaster是两个独立的线程的概念,运行在一个container这个JVM里面的,对么?
②flink on yarn per-job mod提交作业后,节点上执行jps,
有YarnJobClusterEntrypoint和YarnTaskExecutorRunner这两个进程,这两个进程是什么?
③1.12版本有最新的关于flink on yarn 运行原理的介绍吗,我在官网没有看到这部分内容





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
,ApplicationMaster对应的实现是啥?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-parcel使用求助

2021-02-02 Thread lp
flink-parcel是什么提交方式,能详细发下么。如果采用per-job mode 或者application mode ,各个job的flink
集群在yarn上是独立的,kill一个job并不会影响宁一个



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 Thread lp
flink on yarn中,yarn的applicationMaster和flink
JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是
JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has
finished, the ApplicationMaster (AM) is started.The JobManager and AM are
running in the same container. Once they successfully started, the AM knows
the address of the JobManager (its own host).  说明 AM和JM是两个进程,可是我将flink
job提交到yarn集群,只看到有jobManager进程(YarnJobClusterEntrypoint),并没有看到过ApplicationMaster进程,请帮忙解释他们之间的联系和区别?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 Thread lp
应该说是否:1.11和1.12这里这两种提交方式 是不是一样的,只不过命令有了变化?

官网中的摘录如下:

flink-1.11:
Run a single Flink job on YARN

Example:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

--
flink-1.12:
Per-Job Cluster Mode

Example:
./bin/flink run -t yarn-per-job --detached
./examples/streaming/TopSpeedWindowing.jar



--
Sent from: http://apache-flink.147419.n8.nabble.com/


提交job的命令,./bin/flink run-application -t yarn-application ... 和 ./bin/flink run -m yarn-cluster ...

2021-01-28 Thread lp
如题,在 ./flink --help中看到提交job的命令有两个相似的,貌似都会将jobManager发送yarn
node上去之行,但不明白他们区别,官网也未找到他们的区别,请帮忙解释下他们之间的区别?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-27 Thread lp
谢答。查看我的pom.xml文件,和打包后的压缩包,确实包含kafka(org.apache.kafka.common)的相关依赖;所以我将相关的pom中的依赖都设置为provide,然后重新打包,并确认了我打好的jar包中不包含了任何kafka的依赖,发布运行,这次jobmanager直接报错:Caused
by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
---
我怀疑是否是我的集群部署有问题,我是这样做的:
安装了3节点的hadoop(和yarn)集群【master(NameNode、SecondaryNameNode、ResourceManager),slave01(DataNode、NodeManager),slave02(DataNode、NodeManager)】,在master节点上解压缩了flink-1.12.1.tar.gz包,并且在他的lib目录下放置了hadoop的依赖jar包:flink-shaded-hadoop-2-uber-2.8.3-8.0.jar,然后直接上传我的jar包到该节点的/opt下,在flink目录下采用了如下命令发布到yarn集群以applicationMode运行:bin/flink
run-application -t yarn-application
/opt/quickstart-0.1.jar;发现在slave02上分配了jobmanager的container,里面的jobmanager.log报如上错误。
--
我之前从spark转过来的,spark on yarn 并不需要在每个节点部署,不是flink on yarn
是否也是这样的,如果不多,请教下应该是怎样的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread lp
我写了一个 process
function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused by:
org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
of org.apache.kafka.common.serialization.Serializer。 
换了flink版本为1.12.1发现还是报这个错,后尝试采用per-job
Mode部署发现是OK的。查资料发现是跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
child-first改成parent-first,修改后确实ok了,但是有个疑惑,为啥要改这个参数呢,看了官方文档,一般不建议改这个参数的,他避免了使用flink内置的类加载器,而是使用APP自己的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-01-22 Thread lp
测试代码如下:
--
public class Sink_KafkaSink_1{
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
String host = params.get("host");
int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
produceTestdata2kafka(new
StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
}

private static void produceTestdata2kafka(String kafkaAddr) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStreamSource text = env.addSource(new
CustomsourceFuncation()).setParallelism(1);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);

FlinkKafkaProducer producer = new
FlinkKafkaProducer("flinktest",//topic
new SimpleStringSchema(), //消息序列化
properties
);
//写入 Kafka 时附加记录的事件时间戳
producer.setWriteTimestampToKafka(true);
text.addSink(producer);
env.execute("[kafkaSink with custom source]");
}
}

class CustomsourceFuncation implements SourceFunction {
//private long count = 1L;
private boolean isRunning = true;

@Override
public void run(SourceContext ctx) throws Exception {
while(isRunning){
//图书的排行榜
List books = new ArrayList<>();
books.add("msg1");
books.add("msg2");
books.add("msg3");
books.add("msg4");
books.add("msg5");
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
//每2秒产生一条数据
Thread.sleep(2000);
}
}

//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
--

本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
--
2021-01-22 07:54:31,929 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_02 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_02 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)

flink sink到kafka,报错Failed to construct kafka producer

2021-01-21 Thread lp
flink1.11.2 
自定义source循环产生数据然后sink到kafka
采用application Mode部署作业到yarn,
jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下)

2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0. 
2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RUNNING to RESTARTING.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RESTARTING to RUNNING.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from CREATED to SCHEDULED.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from SCHEDULED to DEPLOYING.
2021-01-21 10:52:18,744 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #229) to
container_1611044725922_0017_01_02 @ slave02 (dataPort=39278)
2021-01-21 10:52:18,748 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from DEPLOYING to RUNNING.
2021-01-21 10:52:18,753 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@75c6d62a.
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
~[quickstart-0.1.jar:?]
at

Re: Application Mode部署作业到yarn,找不到.properties文件

2021-01-21 Thread lp
jobManager的完整报错日志如下:
2021-01-21 07:53:23,023 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2021-01-21 07:53:23,027 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting
YarnApplicationClusterEntryPoint (Version: 1.11.2, Scala: 2.11, Rev:fe36135,
Date:2020-09-09T16:19:03+02:00)
2021-01-21 07:53:23,027 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  OS
current user: root
2021-01-21 07:53:23,374 WARN  org.apache.hadoop.util.NativeCodeLoader   
  
[] - Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
2021-01-21 07:53:23,484 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Current
Hadoop/Kerberos user: root
2021-01-21 07:53:23,484 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM: Java
HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.231-b11
2021-01-21 07:53:23,484 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Maximum
heap size: 981 MiBytes
2021-01-21 07:53:23,484 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
JAVA_HOME: /usr/java/jdk1.8.0_231-amd64
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Hadoop
version: 2.8.3
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM
Options:
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Xmx1073741824
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Xms1073741824
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-XX:MaxMetaspaceSize=268435456
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Dlog.file=/usr/local/hadoop/logs/userlogs/application_1611044725922_0013/container_1611044725922_0013_01_01/jobmanager.log
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Dlog4j.configuration=file:log4j.properties
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Dlog4j.configurationFile=file:log4j.properties
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Program
Arguments: (none)
2021-01-21 07:53:23,485 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
Classpath:

Application Mode部署作业到yarn,找不到.properties文件

2021-01-21 Thread lp
flink
processFunction程序,main()中采用ParameterTool读取resources文件夹下的pro.properties配置文件(kafka地址等);IDEA本地执行完全OK,maven打成jar包后,采用yarn
application的方式部署作业, bin/flink run-application -t yarn-application
/opt/quickstart-0.1.jar ;作业失败,查看yarn的container日志发现如下错误:

Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Properties file
/root/hadoop/tmp/nm-local-dir/usercache/root/appcache/application_1611044725922_0013/container_1611044725922_0013_01_01/file:/root/hadoop/tmp/nm-local-dir/usercache/root/appcache/application_1611044725922_0013/container_1611044725922_0013_01_01/quickstart-0.1.jar!/pro.properties
does not exist


我打开quickstart-0.1.jar压缩包,发现是有pro.properties文件的;采用的是官网quickstart项目down下来pom.xml模板



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 空指针警告

2021-01-05 Thread lp
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 空指针警告

2021-01-05 Thread lp
operator操作:processWindowFunction的代码如下:

class MyProcessWindowFuncation extends
ProcessWindowFunction>, String, TimeWindow>{
private transient MapState>
eveShareNoMaxPrice;
private transient ValueState>> shareAndMaxPrice;


@Override
public void process(String s, Context context,
Iterable elements, Collector>> out) throws Exception {
Iterator iterator = elements.iterator();

//得到每trigger周期内每个shareNo的最大值
while (iterator.hasNext()) {
ShareRealTimeData next = iterator.next();
Tuple2 t2 =
eveShareNoMaxPrice.get(next.getShareNo());
if (t2 == null || t2.f1 < next.getCurrentPrice()) {
eveShareNoMaxPrice.put(next.getShareNo(),
Tuple2.of(next.getShareName(), next.getCurrentPrice()));
}
}


TreeMap> shareAndMaxPriceV =
shareAndMaxPrice.value();
if (shareAndMaxPriceV == null) {
shareAndMaxPriceV = new TreeMap(new Comparator() {
@Override
public int compare(Double o1, Double o2) {
return Double.compare(o2, o1);
}
});
}
Iterator>>
keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
while (keysAndMaxPrice.hasNext()) {
Map.Entry> next =
keysAndMaxPrice.next();

shareAndMaxPriceV.put(next.getValue().f1,
Tuple2.of(next.getKey(), next.getValue().f0));
if (shareAndMaxPriceV.size() > 20) {
shareAndMaxPriceV.pollLastEntry();
}
}

eveShareNoMaxPrice.clear();
shareAndMaxPrice.clear();

out.collect(shareAndMaxPriceV);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
eveShareNoMaxPrice = getRuntimeContext().getMapState(new
MapStateDescriptor>("eveShareNoMaxPrice",
TypeInformation.of(new TypeHint() {
}), TypeInformation.of(new TypeHint>()
{
})));
shareAndMaxPrice = getRuntimeContext().getState(new
ValueStateDescriptor>>("shareAndMaxPrice", TypeInformation.of(new
TypeHint>>() {
})));
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 空指针警告

2021-01-05 Thread lp
我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:

public static String timeStampToDate(Long timestamp){
ThreadLocal threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("-MM-dd HH:mm:ss"));
String format = threadLocal.get().format(new Date(timestamp));
return format.substring(0,19);
}



根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下:

.
.
.
//根据数据所在的分钟(processingTime) keyBy
KeyedStream keyByStream =
signoutTimeAndWM.keyBy(new KeySelector() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime());
}
});


SingleOutputStreamOperator>> topNforEveWindow = keyByStream
   
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000)))
   
.trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500)))
//.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFuncation());


//sink
topNforEveWindow.printToErr("topNforEveWindow");

.
.
.

程序运行时,随机在某些整分钟时抛出以下空指针警告:
19:49:22,001 WARN  org.apache.flink.runtime.taskmanager.Task   
[] - Window(TumblingProcessingTimeWindows(1000),
ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) ->
Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched
from RUNNING to FAILED.
java.lang.NullPointerException: null
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]


请帮忙查看是什么原因?



--
Sent from: http://apache-flink.147419.n8.nabble.com/