Hello, So I am trying to use jedis (redis java client) with Flink streaming api, but I get an exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:278) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) Caused by: org.apache.flink.api.common.InvalidProgramException: Object flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624) at flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) ... 6 more Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) ... 16 more so my code I am using: public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, String,String,String,String> , Tuple6<String, String,String,String,String,String>> { private Jedis jedis; private HashMap<String, String> ad_to_campaign; public RedisJoinBolt(String jedisServer) { //initialize jedis this.jedis = new Jedis(jedisServer); } @Override public void flatMap(Tuple5<String,String,String,String,String> input, Collector<Tuple6<String,String,String,String,String,String>> out) throws Exception { . . . Any one know a fix for this?