Hi, yes, I was talking about a Flink bug. I forgot to mention the work-around that Stephan mentioned.
On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <se...@apache.org> wrote: > You can also make the KeySelector a static inner class. That should work > as well. > > On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <tarand...@gmail.com> > wrote: > >> Thank you Aljoscha and Fabian for your replies. >> >> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm >> afraid this is a bug", I am assuming you are referring to Flink engine >> itself. >> >> @Fabian: thanks for the optimization tip. >> >> This is how I have got it working (with a hack): In my dataset, the join >> field/key can be null otherwise .where(fieldName) works and I don't get >> not-serializable exception. So I applied a MapFunction to DataSet and put a >> dummy value in the join field/key where it was null. Then In the join >> function, I change it back to null. >> >> Best, >> Tarandeep >> >> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> the problem is that the KeySelector is an anonymous inner class and as >>> such as a reference to the outer RecordFilterer object. Normally, this >>> would be rectified by the closure cleaner but the cleaner is not used in >>> CoGroup.where(). I'm afraid this is a bug. >>> >>> Best, >>> Aljoscha >>> >>> >>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Tarandeep, >>>> >>>> the exception suggests that Flink tries to serialize RecordsFilterer as >>>> a user function (this happens via Java Serialization). >>>> I said suggests because the code that uses RecordsFilterer is not >>>> included. >>>> >>>> To me it looks like RecordsFilterer should not be used as a user >>>> function. It is a helper class to construct a DataSet program, so it should >>>> not be shipped for execution. >>>> You would use such a class as follows: >>>> >>>> DataSet<T> records = ... >>>> DataSet<String> filterIDs = ... >>>> >>>> RecordsFilterer rf = new RecordsFilterer(); >>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, >>>> filterIDs, "myField"); >>>> >>>> Regarding the join code, I would suggest an optimization. >>>> Instead of using CoGroup, I would use distinct and an OuterJoin like >>>> this: >>>> >>>> DataSet<String> distIds = filtereredIds.distinct(); >>>> DataSet<Tuple2<Boolean, T> result = records >>>> .leftOuterJoin(distIds) >>>> .where(KEYSELECTOR) >>>> .equalTo("*") // use full string as key >>>> .with(JOINFUNC) // set Bool to false if right == null, true otherwise >>>> >>>> Best, Fabian >>>> >>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I am getting NoSerializableException in this class- >>>>> >>>>> >>>>> >>>>> public class RecordsFilterer<T extends GenericRecord> { >>>>> >>>>> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, >>>>> DataSet<String> filteredIds, String fieldName) { >>>>> return dataset.coGroup(filteredIds) >>>>> .where(new KeySelector<T, String>() { >>>>> @Override >>>>> public String getKey(T t) throws Exception { >>>>> String s = (String) t.get(fieldName); >>>>> return s != null ? s : >>>>> UUID.randomUUID().toString(); >>>>> } >>>>> }) >>>>> .equalTo((KeySelector<String, String>) s -> s) >>>>> .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() >>>>> { >>>>> @Override >>>>> public void coGroup(Iterable<T> records, >>>>> Iterable<String> ids, >>>>> Collector<Tuple2<Boolean,T>> >>>>> collector) throws Exception { >>>>> boolean filterFlag = false; >>>>> for (String id : ids) { >>>>> filterFlag = true; >>>>> } >>>>> >>>>> for (T record : records) { >>>>> collector.collect(new Tuple2<>(filterFlag, >>>>> record)); >>>>> } >>>>> } >>>>> }); >>>>> >>>>> } >>>>> } >>>>> >>>>> >>>>> What I am trying to do is write a generic code that will join Avro >>>>> records (of different types) with String records and there is a match add >>>>> a >>>>> filter flag. This way I can use the same code for different Avro record >>>>> types. But I am getting this exception- >>>>> >>>>> Exception in thread "main" >>>>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map >>>>> "Key Extractor" : MAP [[ GlobalProperties >>>>> [partitioning=RANDOM_PARTITIONED] >>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': >>>>> Could >>>>> not write the user code wrapper class >>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>>> java.io.NotSerializableException: >>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer >>>>> at >>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) >>>>> at >>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) >>>>> at >>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >>>>> at >>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) >>>>> at >>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>> at >>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>> at >>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) >>>>> at >>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) >>>>> at >>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) >>>>> at >>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) >>>>> at >>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>>> at >>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) >>>>> at >>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>> at >>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>>>> Caused by: >>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >>>>> Could not write the user code wrapper class >>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>>>> java.io.NotSerializableException: RecordsFilterer >>>>> at >>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >>>>> at >>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) >>>>> at >>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) >>>>> ... 17 more >>>>> Caused by: java.io.NotSerializableException: RecordsFilterer >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>> at >>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>> at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>> at >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>> at >>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>> at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>> at >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>> at >>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>> at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>> at >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>> at >>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) >>>>> at >>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) >>>>> at >>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>>>> ... 19 more >>>>> >>>>> >>>>> Please help me understand why I get this exception and how to fix it >>>>> [rewrite code may be?] >>>>> >>>>> Thanks, >>>>> Tarandeep >>>>> >>>>> >>>>> >>>> >> >