Hi, the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug.
Best, Aljoscha On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote: > Hi Tarandeep, > > the exception suggests that Flink tries to serialize RecordsFilterer as a > user function (this happens via Java Serialization). > I said suggests because the code that uses RecordsFilterer is not included. > > To me it looks like RecordsFilterer should not be used as a user function. > It is a helper class to construct a DataSet program, so it should not be > shipped for execution. > You would use such a class as follows: > > DataSet<T> records = ... > DataSet<String> filterIDs = ... > > RecordsFilterer rf = new RecordsFilterer(); > DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs, > "myField"); > > Regarding the join code, I would suggest an optimization. > Instead of using CoGroup, I would use distinct and an OuterJoin like this: > > DataSet<String> distIds = filtereredIds.distinct(); > DataSet<Tuple2<Boolean, T> result = records > .leftOuterJoin(distIds) > .where(KEYSELECTOR) > .equalTo("*") // use full string as key > .with(JOINFUNC) // set Bool to false if right == null, true otherwise > > Best, Fabian > > 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>: > >> Hi, >> >> I am getting NoSerializableException in this class- >> >> >> >> public class RecordsFilterer<T extends GenericRecord> { >> >> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, >> DataSet<String> filteredIds, String fieldName) { >> return dataset.coGroup(filteredIds) >> .where(new KeySelector<T, String>() { >> @Override >> public String getKey(T t) throws Exception { >> String s = (String) t.get(fieldName); >> return s != null ? s : UUID.randomUUID().toString(); >> } >> }) >> .equalTo((KeySelector<String, String>) s -> s) >> .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() { >> @Override >> public void coGroup(Iterable<T> records, >> Iterable<String> ids, >> Collector<Tuple2<Boolean,T>> >> collector) throws Exception { >> boolean filterFlag = false; >> for (String id : ids) { >> filterFlag = true; >> } >> >> for (T record : records) { >> collector.collect(new Tuple2<>(filterFlag, >> record)); >> } >> } >> }); >> >> } >> } >> >> >> What I am trying to do is write a generic code that will join Avro >> records (of different types) with String records and there is a match add a >> filter flag. This way I can use the same code for different Avro record >> types. But I am getting this exception- >> >> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties >> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, >> grouped=null, unique=null] ]]': Could not write the user code wrapper class >> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> java.io.NotSerializableException: >> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >> at >> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >> at >> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >> at >> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) >> at >> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) >> at >> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) >> at >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >> at >> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) >> at >> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> Caused by: >> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >> Could not write the user code wrapper class >> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> java.io.NotSerializableException: RecordsFilterer >> at >> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) >> at >> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) >> ... 17 more >> Caused by: java.io.NotSerializableException: RecordsFilterer >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) >> at >> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) >> at >> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >> ... 19 more >> >> >> Please help me understand why I get this exception and how to fix it >> [rewrite code may be?] >> >> Thanks, >> Tarandeep >> >> >> >