Hi Priyanka, I see that your are generating dynamic output tags. AFAIK, dynamic tagging is causing that issue. I don't think we can add tags after operators are running.
Can you try with a static named tag which is defined final. And output data that way. Added Till On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A < priyanka.a.ka...@ericsson.com> wrote: > Below is the code: > > public class OutputTagProcessingFunction extends > ProcessFunction<GenericRecord, GenericRecord> > > { > > private static final long serialVersionUID = 1L; > > private HashMap<String, OutputTag<GenericRecord>> outputMap = new > HashMap<>(); > > private List<String> tagList; > > > > public OutputTagProcessingFunction(List<String> tagList) { > > super(); > > this.tagList = tagList; > > } > > > > @Override > > public void processElement(final GenericRecord value, Context ctx, > Collector<GenericRecord> out) throws Exception { > > Set<String> tagSet = new HashSet<>(); > > for (String tag : tagList) { > > List<String> tags = Arrays.asList(tag.split(",")); > > tagSet.addAll(tags); > > } > > > > for (String tag : tagSet) { > > outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) > {}); > > ctx.output(outputMap.get(tag), value); > > } > > } > > } > > > > Exception comes at highlighted line. > > > > > > Regards, > > Priyanka > > *From:* Taher Koitawala <taher...@gmail.com> > *Sent:* Monday, January 11, 2021 6:50 PM > *To:* Priyanka Kalra A <priyanka.a.ka...@ericsson.com> > *Cc:* user <user@flink.apache.org>; Sushil Kumar Singh B < > sushil.kumar.b.si...@ericsson.com>; Anuj Kumar Jain A < > anuj.kumar.a.j...@ericsson.com>; Chirag Dewan <chirag.de...@ericsson.com>; > Pankaj Kumar Aggarwal <pankaj.kumar.aggar...@ericsson.com> > *Subject:* Re: Timestamp Issue with OutputTags > > > > Can you please share your code? > > > > On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A < > priyanka.a.ka...@ericsson.com> wrote: > > Hi Team, > > > > We are generating multiple side-output tags and using default processing > time on non-keyed stream. The class XXXX$YYY extends *ProcessFunction*<I, > O> and implementation is provided for *processElement* method. Upon > sending valid data, it gives error "*Invalid timestamp: > -9223372036854775808. Timestamp should always be non-negative or null*". > > > > - Why is it not able to read timestamp? > - Why is not taking system default time as processing time? > > > > *Complete stack trace for reference:* > > java.lang.IllegalArgumentException: Invalid timestamp: > -9223372036854775808. Timestamp should always be non-negative or null. > > at > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) > ~[kafka-clients-0.11.0.2.jar:?] > > at > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) > ~[kafka-clients-0.11.0.2.jar:?] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) > ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) > ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > * at > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > ~[flink-dist_2.11-1.11.2.jar:1.11.2]* > > * at > com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)* > > * at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > ~[flink-dist_2.11-1.11.2.jar:1.11.2]* > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) > ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) > ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) > ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > > > > > Your help with this would be deeply appreciated! > > > > > > Thanks & Regards, > > Priyanka Kalra > >