Hi Timo, Thank you for the explanation, I can start to see why I was getting an exception. Are you saying that I cannot use static variables at all when trying to deploy to a cluster? I would like the variables to remain static and not be instance-bound as they are accessed from multiple classes. Based on my understanding of what you said, I implemented the following pattern:
public static void main(String[] args) { Properties prop = new Properties(); InputStream is = Config.class.getClassLoader().getResourceAsStream("pipeline.properties"); prop.load(is); strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC")); new Config(strMap, longMap); ... } public class Config { public static String CONFIG_TOPIC; public static String CONFIG_KAFKA; public Config(HashMap<String, String> s) { CONFIG_TOPIC = s.get("CONFIG_TOPIC"); CONFIG_KAFKA = s.get("CONFIG_KAFKA"); } } This produces the same issue. With the easier solution that you listed, are you implying I use multiple instances or a singleton pattern of some sort? On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twal...@apache.org> wrote: > Hi Manas, > > you need to make sure to differentiate between what Flink calls > "pre-flight phase" and "cluster phase". > > The pre-flight phase is were the pipeline is constructed and all > functions are instantiated. They are then later serialized and send to > the cluster. > > If you are reading your properties file in the `main()` method and store > something in static variables, the content is available locally where > the pipeline is constructed (e.g. in the client) but when the function > instances are send to the cluster. Those static variables are fresh > (thus empty) in the cluster JVMs. You need to either make sure that the > properties file is read from each task manager again, or easier: pass > the parameters as constructor parameters into the instances such that > they are shipped together with the function itself. > > I hope this helps. > > Regards, > Timo > > > On 22.10.20 09:24, Manas Kale wrote: > > Hi, > > I am trying to write some data to a kafka topic and I have the following > > situation: > > > > monitorStateStream > > > > .process(new > IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER) > > > > /... // Stream that outputs elements of type IDAP2Alarm/ > > > > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA, > > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK); > > > > private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T> > getFlinkKafkaProducer(String servers, String topic) { > > Properties properties =new Properties(); > > properties.setProperty("bootstrap.servers", servers); > > return new FlinkKafkaProducer<T>(topic, > > (element, timestamp) -> element.serializeForKafka(), > > properties, > > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > > } > > > > /* > > This interface is used to indicate that a class may be output to Kafka. > > Since Kafka treats all > > data as bytes, classes that implement this interface have to provide an > > implementation for the > > serializeForKafka() method. > > */ > > public interface IDAP2JSONOutput { > > > > // Implement serialization logic in this method. > > ProducerRecord<byte[],byte[]> serializeForKafka(); > > > > } > > > > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{ > > > > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class); > > > > @Override > > public ProducerRecord<byte[],byte[]> serializeForKafka() { > > byte[] rawValue; > > byte[] rawKey; > > String k = getMonitorFeatureKey().getMonitorName() ; > > ... > > > > rawValue = val.getBytes(); > > > > LOGGER.info("value of alarms topic from idap2 alarm : " + > > Config.ALARMS_TOPIC); > > > > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); // > Line 95 > > } > > > > } > > > > > > Config.ALARMS_TOPIC is a static string that is read from a properties > > file. When I run this code on my IDE minicluster, it runs great with no > > problems. But when I submit it as a jar to the cluster, I get the > > following error: > > > > Caused by: java.lang.IllegalArgumentException: Topic cannot be null. > > at > > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71) > > > ~[flink_POC-0.1.jar:?] > > at > > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133) > > > ~[flink_POC-0.1.jar:?] > > * at > > > flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95) > > ~[flink_POC-0.1.jar:?]* > > at > > > flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62) > > > ~[flink_POC-0.1.jar:?] > > at > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854) > > > ~[flink_POC-0.1.jar:?] > > at > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) > > > ~[flink_POC-0.1.jar:?] > > at > > > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > *at > > > flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69) > > ~[flink_POC-0.1.jar:?]* > > * at > > > flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25) > > ~[flink_POC-0.1.jar:?]* > > at > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242] > > > > Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the > > LOGGER statement in IDAP2Alarm above is never printed when running on > > Flink cluster. In order to verify if the correct value of > > Config.ALARM_TOPIC is read from configuration file, I printed it from > > Config class - and it prints correctly. So my questions are: > > > > * Why does this work on a minicluster but not when submitted as a jar > > to a normal cluster? I am using Flink v1.11.0 in both my POM file > > and the cluster runtime. > > * Why does the LOGGER line not get printed even though execution > > definitely reached it (as seen from the stacktrace)? > > > > Thank you, > > Manas Kale > >