You can also make the KeySelector a static inner class. That should work as well.
On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <tarand...@gmail.com> wrote: > Thank you Aljoscha and Fabian for your replies. > > @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm > afraid this is a bug", I am assuming you are referring to Flink engine > itself. > > @Fabian: thanks for the optimization tip. > > This is how I have got it working (with a hack): In my dataset, the join > field/key can be null otherwise .where(fieldName) works and I don't get > not-serializable exception. So I applied a MapFunction to DataSet and put a > dummy value in the join field/key where it was null. Then In the join > function, I change it back to null. > > Best, > Tarandeep > > On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> the problem is that the KeySelector is an anonymous inner class and as >> such as a reference to the outer RecordFilterer object. Normally, this >> would be rectified by the closure cleaner but the cleaner is not used in >> CoGroup.where(). I'm afraid this is a bug. >> >> Best, >> Aljoscha >> >> >> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Tarandeep, >>> >>> the exception suggests that Flink tries to serialize RecordsFilterer as >>> a user function (this happens via Java Serialization). >>> I said suggests because the code that uses RecordsFilterer is not >>> included. >>> >>> To me it looks like RecordsFilterer should not be used as a user >>> function. It is a helper class to construct a DataSet program, so it should >>> not be shipped for execution. >>> You would use such a class as follows: >>> >>> DataSet<T> records = ... >>> DataSet<String> filterIDs = ... >>> >>> RecordsFilterer rf = new RecordsFilterer(); >>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, >>> filterIDs, "myField"); >>> >>> Regarding the join code, I would suggest an optimization. >>> Instead of using CoGroup, I would use distinct and an OuterJoin like >>> this: >>> >>> DataSet<String> distIds = filtereredIds.distinct(); >>> DataSet<Tuple2<Boolean, T> result = records >>> .leftOuterJoin(distIds) >>> .where(KEYSELECTOR) >>> .equalTo("*") // use full string as key >>> .with(JOINFUNC) // set Bool to false if right == null, true otherwise >>> >>> Best, Fabian >>> >>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>: >>> >>>> Hi, >>>> >>>> I am getting NoSerializableException in this class- >>>> >>>> >>>> >>>> public class RecordsFilterer<T extends GenericRecord> { >>>> >>>> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, >>>> DataSet<String> filteredIds, String fieldName) { >>>> return dataset.coGroup(filteredIds) >>>> .where(new KeySelector<T, String>() { >>>> @Override >>>> public String getKey(T t) throws Exception { >>>> String s = (String) t.get(fieldName); >>>> return s != null ? s : >>>> UUID.randomUUID().toString(); >>>> } >>>> }) >>>> .equalTo((KeySelector<String, String>) s -> s) >>>> .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() { >>>> @Override >>>> public void coGroup(Iterable<T> records, >>>> Iterable<String> ids, >>>> Collector<Tuple2<Boolean,T>> >>>> collector) throws Exception { >>>> boolean filterFlag = false; >>>> for (String id : ids) { >>>> filterFlag = true; >>>> } >>>> >>>> for (T record : records) { >>>> collector.collect(new Tuple2<>(filterFlag, >>>> record)); >>>> } >>>> } >>>> }); >>>> >>>> } >>>> } >>>> >>>> >>>> What I am trying to do is write a generic code that will join Avro >>>> records (of different types) with String records and there is a match add a >>>> filter flag. This way I can use the same code for different Avro record >>>> types. But I am getting this exception- >>>> >>>> Exception in thread "main" >>>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map >>>> "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] >>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could >>>> not write the user code wrapper class >>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>> java.io.NotSerializableException: >>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) >>>> at >>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >>>> at >>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) >>>> at >>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>> at >>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>> at >>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) >>>> at >>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) >>>> at >>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) >>>> at >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>> at >>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) >>>> at >>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at >>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>>> Caused by: >>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >>>> Could not write the user code wrapper class >>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>> java.io.NotSerializableException: RecordsFilterer >>>> at >>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) >>>> at >>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) >>>> ... 17 more >>>> Caused by: java.io.NotSerializableException: RecordsFilterer >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> at >>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) >>>> at >>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) >>>> at >>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>>> ... 19 more >>>> >>>> >>>> Please help me understand why I get this exception and how to fix it >>>> [rewrite code may be?] >>>> >>>> Thanks, >>>> Tarandeep >>>> >>>> >>>> >>> >