Hey Jerry,

Jay is on the right track, this issue has to do with the Flink operator
life cycle. When you hit execute all your user defined classes get
serialized, so that they can be shipped to the workers on the cluster. To
execute some code before your FlatMapFunction starts processing the data
you can use the open() function of the RichFlatMapFunction, thus enabling
you to make the Jedis attribute transient:

public static class RedisJoinBolt implements
RichFlatMapFunction<Tuple5<String, String,String,String,String>
    , Tuple6<String, String,String,String,String,String>> {
 private transient Jedis jedis;

private Jedis jedisServer;

private HashMap<String, String> ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedisServer = jedisServer;
}

@Override
public void open(Configuration parameters) {
  //initialize jedis
  this.jedis = new Jedis(jedisServer);
 }


@Override
public void flatMap(Tuple5<String,String,String,String,String> input,
Collector<Tuple6<String,String,String,String,String,String>> out)
throws Exception
{


On Fri, Sep 4, 2015 at 8:11 PM, Jay Vyas <jayunit100.apa...@gmail.com>
wrote:

> Maybe wrapping Jedis with a serializable class will do the trick?
>
> But in general is there a way to reference jar classes  in flink apps
> without serializable them?
>
>
> On Sep 4, 2015, at 1:36 PM, Jerry Peng <jerry.boyang.p...@gmail.com>
> wrote:
>
> Hello,
>
> So I am trying to use jedis (redis java client) with Flink streaming api,
> but I get an exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> at org.apache.flink.client.program.Client.run(Client.java:278)
>
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not
> serializable
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
>
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
>
> at
> org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
>
> at
> flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:483)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
> ... 6 more
>
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>
> ... 16 more
>
>
>
>
>
> so my code I am using:
>
>
> public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, 
> String,String,String,String>
>     , Tuple6<String, String,String,String,String,String>> {
>  private Jedis jedis;
>  private HashMap<String, String> ad_to_campaign;
>
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
>
>  @Override
>  public void flatMap(Tuple5<String,String,String,String,String> input,
>            Collector<Tuple6<String,String,String,String,String,String>> out) 
> throws Exception {
>
> .
>
> .
>
> .
>
>
> Any one know a fix for this?
>
>

Reply via email to