Thank you Aljoscha and Fabian for your replies. @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm afraid this is a bug", I am assuming you are referring to Flink engine itself.
@Fabian: thanks for the optimization tip. This is how I have got it working (with a hack): In my dataset, the join field/key can be null otherwise .where(fieldName) works and I don't get not-serializable exception. So I applied a MapFunction to DataSet and put a dummy value in the join field/key where it was null. Then In the join function, I change it back to null. Best, Tarandeep On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > the problem is that the KeySelector is an anonymous inner class and as > such as a reference to the outer RecordFilterer object. Normally, this > would be rectified by the closure cleaner but the cleaner is not used in > CoGroup.where(). I'm afraid this is a bug. > > Best, > Aljoscha > > > On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Tarandeep, >> >> the exception suggests that Flink tries to serialize RecordsFilterer as a >> user function (this happens via Java Serialization). >> I said suggests because the code that uses RecordsFilterer is not >> included. >> >> To me it looks like RecordsFilterer should not be used as a user >> function. It is a helper class to construct a DataSet program, so it should >> not be shipped for execution. >> You would use such a class as follows: >> >> DataSet<T> records = ... >> DataSet<String> filterIDs = ... >> >> RecordsFilterer rf = new RecordsFilterer(); >> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, >> "myField"); >> >> Regarding the join code, I would suggest an optimization. >> Instead of using CoGroup, I would use distinct and an OuterJoin like this: >> >> DataSet<String> distIds = filtereredIds.distinct(); >> DataSet<Tuple2<Boolean, T> result = records >> .leftOuterJoin(distIds) >> .where(KEYSELECTOR) >> .equalTo("*") // use full string as key >> .with(JOINFUNC) // set Bool to false if right == null, true otherwise >> >> Best, Fabian >> >> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>: >> >>> Hi, >>> >>> I am getting NoSerializableException in this class- >>> >>> >>> >>> public class RecordsFilterer<T extends GenericRecord> { >>> >>> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, >>> DataSet<String> filteredIds, String fieldName) { >>> return dataset.coGroup(filteredIds) >>> .where(new KeySelector<T, String>() { >>> @Override >>> public String getKey(T t) throws Exception { >>> String s = (String) t.get(fieldName); >>> return s != null ? s : UUID.randomUUID().toString(); >>> } >>> }) >>> .equalTo((KeySelector<String, String>) s -> s) >>> .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() { >>> @Override >>> public void coGroup(Iterable<T> records, >>> Iterable<String> ids, >>> Collector<Tuple2<Boolean,T>> >>> collector) throws Exception { >>> boolean filterFlag = false; >>> for (String id : ids) { >>> filterFlag = true; >>> } >>> >>> for (T record : records) { >>> collector.collect(new Tuple2<>(filterFlag, >>> record)); >>> } >>> } >>> }); >>> >>> } >>> } >>> >>> >>> What I am trying to do is write a generic code that will join Avro >>> records (of different types) with String records and there is a match add a >>> filter flag. This way I can use the same code for different Avro record >>> types. But I am getting this exception- >>> >>> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >>> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties >>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, >>> grouped=null, unique=null] ]]': Could not write the user code wrapper class >>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>> java.io.NotSerializableException: >>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >>> at >>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) >>> at >>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) >>> at >>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>> at >>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) >>> at >>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at >>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>> Caused by: >>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >>> Could not write the user code wrapper class >>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>> java.io.NotSerializableException: RecordsFilterer >>> at >>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) >>> ... 17 more >>> Caused by: java.io.NotSerializableException: RecordsFilterer >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) >>> at >>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) >>> at >>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>> ... 19 more >>> >>> >>> Please help me understand why I get this exception and how to fix it >>> [rewrite code may be?] >>> >>> Thanks, >>> Tarandeep >>> >>> >>> >>