Hey Jerry, Jay is on the right track, this issue has to do with the Flink operator life cycle. When you hit execute all your user defined classes get serialized, so that they can be shipped to the workers on the cluster. To execute some code before your FlatMapFunction starts processing the data you can use the open() function of the RichFlatMapFunction, thus enabling you to make the Jedis attribute transient:
public static class RedisJoinBolt implements RichFlatMapFunction<Tuple5<String, String,String,String,String> , Tuple6<String, String,String,String,String,String>> { private transient Jedis jedis; private Jedis jedisServer; private HashMap<String, String> ad_to_campaign; public RedisJoinBolt(String jedisServer) { //initialize jedis this.jedisServer = jedisServer; } @Override public void open(Configuration parameters) { //initialize jedis this.jedis = new Jedis(jedisServer); } @Override public void flatMap(Tuple5<String,String,String,String,String> input, Collector<Tuple6<String,String,String,String,String,String>> out) throws Exception { On Fri, Sep 4, 2015 at 8:11 PM, Jay Vyas <jayunit100.apa...@gmail.com> wrote: > Maybe wrapping Jedis with a serializable class will do the trick? > > But in general is there a way to reference jar classes in flink apps > without serializable them? > > > On Sep 4, 2015, at 1:36 PM, Jerry Peng <jerry.boyang.p...@gmail.com> > wrote: > > Hello, > > So I am trying to use jedis (redis java client) with Flink streaming api, > but I get an exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > > at org.apache.flink.client.program.Client.run(Client.java:278) > > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) > > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) > > Caused by: org.apache.flink.api.common.InvalidProgramException: Object > flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not > serializable > > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320) > > at > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144) > > at > org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624) > > at > flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:483) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > > ... 6 more > > Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306) > > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) > > ... 16 more > > > > > > so my code I am using: > > > public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, > String,String,String,String> > , Tuple6<String, String,String,String,String,String>> { > private Jedis jedis; > private HashMap<String, String> ad_to_campaign; > > public RedisJoinBolt(String jedisServer) { > //initialize jedis > this.jedis = new Jedis(jedisServer); > } > > @Override > public void flatMap(Tuple5<String,String,String,String,String> input, > Collector<Tuple6<String,String,String,String,String,String>> out) > throws Exception { > > . > > . > > . > > > Any one know a fix for this? > >