Hello,

So I am trying to use jedis (redis java client) with Flink streaming api,
but I get an exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

at org.apache.flink.client.program.Client.run(Client.java:278)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)

Caused by: org.apache.flink.api.common.InvalidProgramException: Object
flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not
serializable

at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)

at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)

at
org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)

at
flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

... 6 more

Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)

at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)

... 16 more





so my code I am using:


public static class RedisJoinBolt implements
FlatMapFunction<Tuple5<String, String,String,String,String>
    , Tuple6<String, String,String,String,String,String>> {
 private Jedis jedis;
 private HashMap<String, String> ad_to_campaign;

 public RedisJoinBolt(String jedisServer) {
  //initialize jedis
  this.jedis = new Jedis(jedisServer);
 }

 @Override
 public void flatMap(Tuple5<String,String,String,String,String> input,
           Collector<Tuple6<String,String,String,String,String,String>>
out) throws Exception {

.

.

.


Any one know a fix for this?

Reply via email to