Hi Manas,
you need to make sure to differentiate between what Flink calls
"pre-flight phase" and "cluster phase".
The pre-flight phase is were the pipeline is constructed and all
functions are instantiated. They are then later serialized and send to
the cluster.
If you are reading your properties file in the `main()` method and store
something in static variables, the content is available locally where
the pipeline is constructed (e.g. in the client) but when the function
instances are send to the cluster. Those static variables are fresh
(thus empty) in the cluster JVMs. You need to either make sure that the
properties file is read from each task manager again, or easier: pass
the parameters as constructor parameters into the instances such that
they are shipped together with the function itself.
I hope this helps.
Regards,
Timo
On 22.10.20 09:24, Manas Kale wrote:
Hi,
I am trying to write some data to a kafka topic and I have the following
situation:
monitorStateStream
.process(new
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
/... // Stream that outputs elements of type IDAP2Alarm/
.addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
getFlinkKafkaProducer(String servers, String topic) {
Properties properties =new Properties();
properties.setProperty("bootstrap.servers", servers);
return new FlinkKafkaProducer<T>(topic,
(element, timestamp) -> element.serializeForKafka(),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
/*
This interface is used to indicate that a class may be output to Kafka.
Since Kafka treats all
data as bytes, classes that implement this interface have to provide an
implementation for the
serializeForKafka() method.
*/
public interface IDAP2JSONOutput {
// Implement serialization logic in this method.
ProducerRecord<byte[],byte[]> serializeForKafka();
}
public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
@Override
public ProducerRecord<byte[],byte[]> serializeForKafka() {
byte[] rawValue;
byte[] rawKey;
String k = getMonitorFeatureKey().getMonitorName() ;
...
rawValue = val.getBytes();
LOGGER.info("value of alarms topic from idap2 alarm : " +
Config.ALARMS_TOPIC);
return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); // Line 95
}
}
Config.ALARMS_TOPIC is a static string that is read from a properties
file. When I run this code on my IDE minicluster, it runs great with no
problems. But when I submit it as a jar to the cluster, I get the
following error:
Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
at
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
~[flink_POC-0.1.jar:?]
at
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
~[flink_POC-0.1.jar:?]
* at
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
~[flink_POC-0.1.jar:?]*
at
flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
~[flink_POC-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
~[flink_POC-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
~[flink_POC-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
*at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
~[flink_POC-0.1.jar:?]*
* at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
~[flink_POC-0.1.jar:?]*
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the
LOGGER statement in IDAP2Alarm above is never printed when running on
Flink cluster. In order to verify if the correct value of
Config.ALARM_TOPIC is read from configuration file, I printed it from
Config class - and it prints correctly. So my questions are:
* Why does this work on a minicluster but not when submitted as a jar
to a normal cluster? I am using Flink v1.11.0 in both my POM file
and the cluster runtime.
* Why does the LOGGER line not get printed even though execution
definitely reached it (as seen from the stacktrace)?
Thank you,
Manas Kale