Re: Securing metrics reporters (Flink <-> Prometheus)

2018-03-02 Thread Chesnay Schepler

Hello,

this is currently not supported.

Regards,
Chesnay

On 02.03.2018 18:38, Fritz Budiyanto wrote:

Hi All,

How can I configure encryption for metrics.reports ?  Particularly 
Prometheus ? I do not see any mention of encryption in the 
metrics.reports traffic in flink documentation. Is encryption supported?

If yes, could shed a light not how to do this ?

|metrics.reporters: prom metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter|

||


Thanks,
Fritz





Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-02 Thread Hequn Cheng
Hi sundy,

1. Some partition of your input kafka don't have data. Since window
watermark is the min value of all it's inputs, if there are no data from
one of it's inputs, window will never be triggered. You can set parallelism
of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but
worth a try).
2. Only one record in the input. In this case, window can not be triggered
either. You might think of it like the time has be stopped. To trigger the
widow, you should read more data with watermark bigger than the window end.

Hope it helps you.
Best, Hequn

2018-03-03 13:06 GMT+08:00 sundy <543950...@qq.com>:

> Hi, thanks for your reply.
>
> I have searched it in stackoverflow, and there is someone who has the some
> problem.
>
> https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-
> correctly-with-processing-time-but-will-not-produc
>
>
> From your advice, I tried the code.
>
>  env.getConfig().setAutoWatermarkInterval(3 * 1000);
>
> And it calls the getCurrentWaterMark function each 3 seconds,  but still
> no result come out.
> From the outputs   ('water mark1520049229163'), I could see that the add
> method is called. But the no result from the sink function.
>
>
>
>
> On 3 Mar 2018, at 12:47, Xingcan Cui  wrote:
>
> Hi,
>
> for periodically generated watermarks, you should use 
> `ExecutionConfig.setAutoWatermarkInterval()`
> to set an interval.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
>
>
>
> Hi, I got a problem in Flink  and need your help.
>
> I tried to use TimeCharacteristic.EvenTime, but the sink function never be
> executed.
>
> public class StreamingJob {
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> ObjectMapper jsonMapper = new ObjectMapper();
>
> Properties properties = new Properties();
> //String brokers = "172.27.138.8:9092";
> String brokers = "localhost:9092";
> properties.setProperty("bootstrap.servers", brokers);
> properties.setProperty("group.id", "test_fink");
> String topic = "stream_test";
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> FlinkKafkaConsumer010 myConsumer =
> new FlinkKafkaConsumer010(topic, new 
> BitRate.BitRateDeserializtionSchema(), properties);
>
> DataStream stream = env.addSource(myConsumer)
> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
> DataStream
> reduceItems =
> stream
> .keyBy(a -> a.gameId)
> .timeWindow(Time.seconds(10))
> .reduce((a, b) -> a.add(b));
>
> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
> (tuple) -> {
>   try {
> tuple.end();
> System.out.println(tuple.rate + "\t" + tuple.user);
> return jsonMapper.writeValueAsBytes(tuple);
>   } catch (JsonProcessingException e) {
> e.printStackTrace();
> return "".getBytes();
>   }
> }));
>
> env.execute("Flink Streaming Java API Skeleton");
>   }
>
> }
>
>
>
> Here is the CustomWatermarkEmitter. I tried to increase the lag number,
>  but not worked.
>
> public class CustomWatermarkEmitter implements 
> AssignerWithPeriodicWatermarks {
>
>   private long currentMax = 0;
>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
> big
>
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
> long atLeastTime = currentMax - lag;
> System.out.println("water mark" + atLeastTime);
> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>   }
>
>   @Override
>   public long extractTimestamp(BitRate bitRate, long l) {
> currentMax = Long.max(bitRate.eventTime, currentMax);
> return bitRate.eventTime;
>   }
> }
>
>
>
> Here is the entity BitRate, the logs are generated in time , sample log
> `4281_783_1520047769115`
>
>
> public BitRate(long eventTime, long gameId, long rate, long user) {
>   this.eventTime = eventTime;
>
>   this.gameId = gameId;
>   this.rate = rate;
>   this.user = user;
>   this.startTs = System.currentTimeMillis();
>   this.type = 0;
> }
>
> public void end() {
>   this.endTs = System.currentTimeMillis();
> }
>
> public BitRate add(BitRate b) {
>   System.out.println("Add:" + b.rate);
>   this.rate += b.rate;
>   this.user += b.user;
>   return this;
> }
>
>
>
>
>


Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-02 Thread sundy
Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some 
problem.

https://stackoverflow.com/questions/40993753/flink-streaming-program-runs-correctly-with-processing-time-but-will-not-produc
 



From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no 
result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method 
is called. But the no result from the sink function.




> On 3 Mar 2018, at 12:47, Xingcan Cui  wrote:
> 
> Hi,
> 
> for periodically generated watermarks, you should use 
> `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
>> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com 
>> > wrote:
>> 
>> 
>> 
>> Hi, I got a problem in Flink  and need your help.
>> 
>> I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
>> executed.  
>> 
>> public class StreamingJob {
>>   public static void main(String[] args) throws Exception {
>> // set up the streaming execution environment
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> ObjectMapper jsonMapper = new ObjectMapper();
>> 
>> Properties properties = new Properties();
>> //String brokers = "172.27.138.8:9092";
>> String brokers = "localhost:9092";
>> properties.setProperty("bootstrap.servers", brokers);
>> properties.setProperty("group.id ", "test_fink");
>> String topic = "stream_test";
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> FlinkKafkaConsumer010 myConsumer =
>> new FlinkKafkaConsumer010(topic, new 
>> BitRate.BitRateDeserializtionSchema(), properties);
>> 
>> DataStream stream = env.addSource(myConsumer)
>> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>> DataStream
>> reduceItems =
>> stream
>> .keyBy(a -> a.gameId)
>> .timeWindow(Time.seconds(10))
>> .reduce((a, b) -> a.add(b));
>> 
>> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
>> (tuple) -> {
>>   try {
>> tuple.end();
>> System.out.println(tuple.rate + "\t" + tuple.user);
>> return jsonMapper.writeValueAsBytes(tuple);
>>   } catch (JsonProcessingException e) {
>> e.printStackTrace();
>> return "".getBytes();
>>   }
>> }));
>> 
>> env.execute("Flink Streaming Java API Skeleton");
>>   }
>> 
>> }
>> 
>> 
>> Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
>> not worked.
>> 
>> public class CustomWatermarkEmitter implements 
>> AssignerWithPeriodicWatermarks {
>> 
>>   private long currentMax = 0;
>>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
>> big
>> 
>>   @Nullable
>>   @Override
>>   public Watermark getCurrentWatermark() {
>> long atLeastTime = currentMax - lag;
>> System.out.println("water mark" + atLeastTime);
>> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>>   }
>> 
>>   @Override
>>   public long extractTimestamp(BitRate bitRate, long l) {
>> currentMax = Long.max(bitRate.eventTime, currentMax);
>> return bitRate.eventTime;
>>   }
>> }
>> 
>> 
>> Here is the entity BitRate, the logs are generated in time , sample log   
>> `4281_783_1520047769115`
>> 
>> 
>> public BitRate(long eventTime, long gameId, long rate, long user) {
>>   this.eventTime = eventTime;
>> 
>>   this.gameId = gameId;
>>   this.rate = rate;
>>   this.user = user;
>>   this.startTs = System.currentTimeMillis();
>>   this.type = 0;
>> }
>> 
>> public void end() {
>>   this.endTs = System.currentTimeMillis();
>> }
>> 
>> public BitRate add(BitRate b) {
>>   System.out.println("Add:" + b.rate);
>>   this.rate += b.rate;
>>   this.user += b.user;
>>   return this;
>> }
>> 
> 



Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-02 Thread Xingcan Cui
Hi,

for periodically generated watermarks, you should use 
`ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

> On 3 Mar 2018, at 12:08 PM, sundy <543950...@qq.com> wrote:
> 
> 
> 
> Hi, I got a problem in Flink  and need your help.
> 
> I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
> executed.  
> 
> public class StreamingJob {
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> ObjectMapper jsonMapper = new ObjectMapper();
> 
> Properties properties = new Properties();
> //String brokers = "172.27.138.8:9092";
> String brokers = "localhost:9092";
> properties.setProperty("bootstrap.servers", brokers);
> properties.setProperty("group.id", "test_fink");
> String topic = "stream_test";
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> FlinkKafkaConsumer010 myConsumer =
> new FlinkKafkaConsumer010(topic, new 
> BitRate.BitRateDeserializtionSchema(), properties);
> 
> DataStream stream = env.addSource(myConsumer)
> .assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
> DataStream
> reduceItems =
> stream
> .keyBy(a -> a.gameId)
> .timeWindow(Time.seconds(10))
> .reduce((a, b) -> a.add(b));
> 
> reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
> (tuple) -> {
>   try {
> tuple.end();
> System.out.println(tuple.rate + "\t" + tuple.user);
> return jsonMapper.writeValueAsBytes(tuple);
>   } catch (JsonProcessingException e) {
> e.printStackTrace();
> return "".getBytes();
>   }
> }));
> 
> env.execute("Flink Streaming Java API Skeleton");
>   }
> 
> }
> 
> 
> Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
> not worked.
> 
> public class CustomWatermarkEmitter implements 
> AssignerWithPeriodicWatermarks {
> 
>   private long currentMax = 0;
>   private long lag = 3600 * 1000; //not worked ,even though the lag is very 
> big
> 
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
> long atLeastTime = currentMax - lag;
> System.out.println("water mark" + atLeastTime);
> return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
>   }
> 
>   @Override
>   public long extractTimestamp(BitRate bitRate, long l) {
> currentMax = Long.max(bitRate.eventTime, currentMax);
> return bitRate.eventTime;
>   }
> }
> 
> 
> Here is the entity BitRate, the logs are generated in time , sample log   
> `4281_783_1520047769115`
> 
> 
> public BitRate(long eventTime, long gameId, long rate, long user) {
>   this.eventTime = eventTime;
> 
>   this.gameId = gameId;
>   this.rate = rate;
>   this.user = user;
>   this.startTs = System.currentTimeMillis();
>   this.type = 0;
> }
> 
> public void end() {
>   this.endTs = System.currentTimeMillis();
> }
> 
> public BitRate add(BitRate b) {
>   System.out.println("Add:" + b.rate);
>   this.rate += b.rate;
>   this.user += b.user;
>   return this;
> }
> 



Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

2018-03-02 Thread sundy


Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be 
executed.  

public class StreamingJob {
  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
//String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 myConsumer =
new FlinkKafkaConsumer010(topic, new 
BitRate.BitRateDeserializtionSchema(), properties);

DataStream stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", 
(tuple) -> {
  try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
  } catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
  }
}));

env.execute("Flink Streaming Java API Skeleton");
  }

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but 
not worked.

public class CustomWatermarkEmitter implements 
AssignerWithPeriodicWatermarks {

  private long currentMax = 0;
  private long lag = 3600 * 1000; //not worked ,even though the lag is very big

  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
  }

  @Override
  public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
  }
}


Here is the entity BitRate, the logs are generated in time , sample log   
`4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
  this.eventTime = eventTime;

  this.gameId = gameId;
  this.rate = rate;
  this.user = user;
  this.startTs = System.currentTimeMillis();
  this.type = 0;
}

public void end() {
  this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
  System.out.println("Add:" + b.rate);
  this.rate += b.rate;
  this.user += b.user;
  return this;
}



Re: Which test cluster to use for checkpointing tests?

2018-03-02 Thread Ken Krugler
Hi Stephan,

Thanks for the update.

So is support for “running checkpoints with closed sources” part of FLIP-15 
, or 
something separate?

Regards,

— Ken

> On Mar 1, 2018, at 9:07 AM, Stephan Ewen  wrote:
> 
> @Ken The issue you are running into is that Checkpointing works currently 
> only until the job reaches the point where the pipeline starts to drain out, 
> meaning when the sources are done. In your case, the source is done 
> immediately, sending out only one tuple.
> 
> Running checkpoints with closed sources is something that's on the feature 
> list and will come soon…


http://about.me/kkrugler
+1 530-210-6378



Serialization and Deserialization of Avro messages stored in Kafka

2018-03-02 Thread Filipe Couto
Hello,

I have a few topics that I want to read from Kafka, which consist mainly on
a key value pair of: timestamp (key) and value (byte array).

The bite array doesn't really have a class to deserialize from, since the
Avro Record we have comes from a "SELECT * FROM..." that selects several
SQL tables and in each topic we have that table represented.

We're using a GenericRecord, and since we know the structure of the table
via the name of the topic we know the column names, like
this: genericRecord.get("COLUMN_NAME").toString()

Given this, we're now trying to read a Kafka topic using Flink, and we have
this:

The environment is the StreamExecutionEnvironment and the properties are
about the Kafka serialization and deserialization and Kafka and Zookeeper
IP addresses.

class...

DataStream messageStream = environment
.addSource(new FlinkKafkaConsumer010<>(baseTopic, new
MyDeserializationSchema(schema), properties));

messageStream.print();

try {
environment.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return false;
}
}

class MyDeserializationSchema implements DeserializationSchema {
private static final Logger log =
LoggerFactory.getLogger(MyDeserializationSchema.class);

private final Class avrotype = (Class)
org.apache.avro.generic.GenericRecord.class;
private final Schema schema;
public MyDeserializationSchema(Schema schema) {
this.schema = schema;
}

@Override
public T deserialize(byte[] arg0) throws IOException {
log.info("Starting deserialization");
GenericRecord genericRecord;
Injection recordInjection = GenericAvroCodecs
.toBinary(schema);
log.info(recordInjection.toString());
genericRecord = recordInjection.invert(arg0).get();
log.info(genericRecord.toString());
return (T) genericRecord;
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(avrotype);
}

}

Executing this on our server generates the following:

[2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer
(key.deserializer)
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09)

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The implementation of
the FlinkKafkaConsumer09 is not serializable. The object probably contains
or references non serializable fields.
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
at
com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153)
at
com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66)
Caused by: java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 6 more


I can't understand why the logs refer to a  FlinkKafkaConsumer09 when we're
using the  FlinkKafkaConsumer010 version.
And also, how can we deserialize to a GenericRecord so we can access the
record fields like we're doing when we're just reading a Kafka topic
without Flink.


Thanks in advance for any help that is given to us.


Securing metrics reporters (Flink <-> Prometheus)

2018-03-02 Thread Fritz Budiyanto
Hi All,

How can I configure encryption for metrics.reports ?  Particularly Prometheus ? 
I do not see any mention of encryption in the metrics.reports traffic in flink 
documentation. Is encryption supported?
If yes, could shed a light not how to do this ? 

metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter


Thanks,
Fritz

Restart hook and checkpoint

2018-03-02 Thread ashish pok
All,
It looks like Flink's default behavior is to restart all operators on a single 
operator error - in my case it is a Kafka Producer timing out. When this 
happens, I see logs that all operators are restarted. This essentially leads to 
data loss. In my case the volume of data is so high that it is becoming very 
expensive to checkpoint. I was wondering if Flink has a lifecycle hook to 
attach a forced checkpointing before restarting operators. That will solve a 
dire production issue for us. 
Thanks,

-- Ashish

Re: Fwd: Flink throwing ava.lang.ClassNotFoundException after upgrading to 1.4.1

2018-03-02 Thread Tzu-Li (Gordon) Tai
Hi Ankit,

This is a known issue in 1.4.1. Please see 
https://issues.apache.org/jira/browse/FLINK-8741.

The release for 1.4.2 will include a fix for this issue, and we already have a 
release candidate being voted at the moment.
Hopefully, it will be released soon, probable early next week.

Cheers,
Gordon
On 2 March 2018 at 4:01:22 PM, Ankit Chaudhary (my00...@gmail.com) wrote:

Hey Guys,

I just want to throw a question regarding the latest flink release 1.4.1. I 
have a flink topology which was first written using flink version 1.2.0. Since 
then, we are continuously try to keep our libraries upto date. So, we try to 
upgrade this same flink topology from version 1.4.0  to 1.4.1. 

To give a broader view about the toplogy, it is reading the events from Kafka 
and  after some calculations writing the output back into elasticsearch sink. 
We are using Kafka 1.0.0 and Elasticsearch 5.4.2. 

After changing the library version from 1.4.0 to 1.4.1, I found some 
compilation errors because of shaded elasticsearch dependencies (I was using 
JodaDate from the transitive dependency). After fixing the import problem, I 
build a new fatJar and deployed it on Flink cluster running with 1.4.1. When I 
deploy the newly built fatJar, I get following exception:

2018-03-01 17:24:45,873 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
roadrunner_matching (2c44f471297f94963df1c45785797d69) switched from state 
FAILING to FAILED.
java.lang.ClassNotFoundException: 
com.dilax.roadrunner.pcudata.matcher.topology.PCUDataMatcherTopology$2
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
        at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)

I checked the fat jar and I can see that the class exists there: 

com/dilax/roadrunner/pcudata/matcher/topology/
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology$1.class
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology$2.class
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology.class


Did someone else also reported similar issue? or am I missing something? Also, 
if I deploy my old jar with 1.4.0 on the same infrastructure it works. I 
literally used git diff to compare between the two commit and I can only see 
changes in the lib version and change in the imports for JodaDate. 

I will investigate it further today and will post if I find the solution. 
Meanwhile, if someone here also encountered similar issue post upgrade please 
help.   


Cheers, Ankit




Fwd: Flink throwing ava.lang.ClassNotFoundException after upgrading to 1.4.1

2018-03-02 Thread Ankit Chaudhary
Hey Guys,

I just want to throw a question regarding the latest flink release 1.4.1. I
have a flink topology which was first written using flink version 1.2.0.
Since then, we are continuously try to keep our libraries upto date. So, we
try to upgrade this same flink topology from version 1.4.0  to 1.4.1.

To give a broader view about the toplogy, it is reading the events from
Kafka and  after some calculations writing the output back into
elasticsearch sink. We are using Kafka 1.0.0 and Elasticsearch 5.4.2.

After changing the library version from 1.4.0 to 1.4.1, I found some
compilation errors because of shaded elasticsearch dependencies (I was
using JodaDate from the transitive dependency). After fixing the import
problem, I build a new fatJar and deployed it on Flink cluster running with
1.4.1. When I deploy the newly built fatJar, I get following exception:

2018-03-01 17:24:45,873 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
  - Job roadrunner_matching (2c44f471297f94963df1c45785797d69) switched
from state FAILING to FAILED.
java.lang.ClassNotFoundException: com.dilax.roadrunner.pcudata.
matcher.topology.PCUDataMatcherTopology$2
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$
ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(
ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(
ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(
SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.
AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.
AbstractFetcher.(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.
Kafka09Fetcher.(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.
Kafka010Fetcher.(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.
run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.
run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

I checked the fat jar and I can see that the class exists there:

com/dilax/roadrunner/pcudata/matcher/topology/
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology$1.class
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology$2.class
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology.class


Did someone else also reported similar issue? or am I missing something? Also,
if I deploy my old jar with 1.4.0 on the same infrastructure it works. I
literally used git diff to compare between the two commit and I can only
see changes in the lib version and change in the imports for JodaDate.

I will investigate it further today and will post if I find the solution.
Meanwhile, if someone here also encountered similar issue post upgrade
please help.


Cheers, Ankit