Re: FlatMap 报错Invalid timestamp: -1.

2021-04-16 文章 lp
查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1,
相关代码片段如下,请教是什么原因导致的呢?

//sink
Properties producerPro = new Properties();
producerPro.setProperty("bootstrap.servers",kafkaAddr);
producerPro.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
"60");
   
FlinkKafkaProducer flinkKafkaProducer = new
FlinkKafkaProducer(dwdOsqueryDetailTopic, new SimpleStringSchema(),
producerPro, null, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5);

flinkKafkaProducer.setWriteTimestampToKafka(true);
beanStr.addSink(flinkKafkaProducer);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlatMap 报错Invalid timestamp: -1.

2021-04-16 文章 lp
程序一直正常运行,后来突然偶尔报错如下,显示flatMap的Collect时出错:
我的flatMap transform操作代码片段如下,收集的数据是来自kafka的topic
--
SingleOutputStreamOperator text2Bean =
consumerRecordDataStreamSource.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String jsonStr, Collector out)
throws Exception {
OsqueryBean osqueryBean =
JSON.parseObject(jsonStr,OsqueryBean.class);
if (StringUtils.isNotEmpty((String)
Utils.mapGet(osqueryBean, "columns", "cmdline"))) {
out.collect(osqueryBean);
}
}
});
--
13:59:53,885 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  
[] - Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (5/8)
(171fc1965c9c20a35cb48588cd88b35f) switched from RUNNING to FAILED on
d0468f82-70e8-4b65-99f2-315466cd15cd @ 127.0.0.1 (dataPort=-1).
java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should
always be non-negative or null.
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:74)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:97)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper.serialize(KafkaSerializationSchemaWrapper.java:86)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:83)
~[classes/:?]
at
org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:78)
~[classes/:?]
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at