Hi Timo,
Thank you for the explanation, I can start to see why I was getting an
exception.
Are you saying that I cannot use static variables at all when trying to
deploy to a cluster? I would like the variables to remain static and not be
instance-bound as they are accessed from multiple classes.
Based on my understanding of what you said, I implemented the
following pattern:

public static void main(String[] args) {
       Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap, longMap);

...

}

public class Config {

public static String CONFIG_TOPIC;
public static String CONFIG_KAFKA;

public Config(HashMap<String, String> s) {
    CONFIG_TOPIC = s.get("CONFIG_TOPIC");
    CONFIG_KAFKA = s.get("CONFIG_KAFKA");

}

}

This produces the same issue. With the easier solution that you listed, are
you implying I use multiple instances or a singleton pattern of some sort?

On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twal...@apache.org> wrote:

> Hi Manas,
>
> you need to make sure to differentiate between what Flink calls
> "pre-flight phase" and "cluster phase".
>
> The pre-flight phase is were the pipeline is constructed and all
> functions are instantiated. They are then later serialized and send to
> the cluster.
>
> If you are reading your properties file in the `main()` method and store
> something in static variables, the content is available locally where
> the pipeline is constructed (e.g. in the client) but when the function
> instances are send to the cluster. Those static variables are fresh
> (thus empty) in the cluster JVMs. You need to either make sure that the
> properties file is read from each task manager again, or easier: pass
> the parameters as constructor parameters into the instances such that
> they are shipped together with the function itself.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 22.10.20 09:24, Manas Kale wrote:
> > Hi,
> > I am trying to write some data to a kafka topic and I have the following
> > situation:
> >
> > monitorStateStream
> >
> >     .process(new
> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >
> >     /... // Stream that outputs elements of type IDAP2Alarm/
> >
> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >
> > private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
> getFlinkKafkaProducer(String servers, String topic) {
> >     Properties properties =new Properties();
> >     properties.setProperty("bootstrap.servers", servers);
> >     return new FlinkKafkaProducer<T>(topic,
> >           (element, timestamp) -> element.serializeForKafka(),
> >           properties,
> >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> > }
> >
> > /*
> > This interface is used to indicate that a class may be output to Kafka.
> > Since Kafka treats all
> > data as bytes, classes that implement this interface have to provide an
> > implementation for the
> > serializeForKafka() method.
> > */
> > public interface IDAP2JSONOutput {
> >
> >      // Implement serialization logic in this method.
> > ProducerRecord<byte[],byte[]> serializeForKafka();
> >
> > }
> >
> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
> >
> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
> >
> > @Override
> > public ProducerRecord<byte[],byte[]> serializeForKafka() {
> >      byte[] rawValue;
> >      byte[] rawKey;
> >      String k = getMonitorFeatureKey().getMonitorName() ;
> >      ...
> >
> >      rawValue = val.getBytes();
> >
> >      LOGGER.info("value of alarms topic from idap2 alarm : " +
> > Config.ALARMS_TOPIC);
> >
> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
> Line 95
> > }
> >
> > }
> >
> >
> > Config.ALARMS_TOPIC is a static string that is read from a properties
> > file. When I run this code on my IDE minicluster, it runs great with no
> > problems. But when I submit it as a jar to the cluster, I get the
> > following error:
> >
> > Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
> >      at
> >
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
>
> > ~[flink_POC-0.1.jar:?]
> > *    at
> >
> flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> > ~[flink_POC-0.1.jar:?]*
> >      at
> >
> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > *at
> >
> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
> > ~[flink_POC-0.1.jar:?]*
> > *    at
> >
> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
> > ~[flink_POC-0.1.jar:?]*
> >      at
> >
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.streaming.runtime.io
> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.streaming.runtime.io
> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.streaming.runtime.io
> > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
> >
> > Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the
> > LOGGER statement in IDAP2Alarm above is never printed when running on
> > Flink cluster. In order to verify if the correct value of
> > Config.ALARM_TOPIC is read from configuration file, I printed it from
> > Config class - and it prints correctly. So my questions are:
> >
> >   * Why does this work on a minicluster but not when submitted as a jar
> >     to a normal cluster? I am using Flink v1.11.0 in both my POM file
> >     and the cluster runtime.
> >   * Why does the LOGGER line not get printed even though execution
> >     definitely reached it (as seen from the stacktrace)?
> >
> > Thank you,
> > Manas Kale
>
>

Reply via email to