Hi,
the problem is that the KeySelector is an anonymous inner class and as such
as a reference to the outer RecordFilterer object. Normally, this would be
rectified by the closure cleaner but the cleaner is not used in
CoGroup.where(). I'm afraid this is a bug.

Best,
Aljoscha


On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Tarandeep,
>
> the exception suggests that Flink tries to serialize RecordsFilterer as a
> user function (this happens via Java Serialization).
> I said suggests because the code that uses RecordsFilterer is not included.
>
> To me it looks like RecordsFilterer should not be used as a user function.
> It is a helper class to construct a DataSet program, so it should not be
> shipped for execution.
> You would use such a class as follows:
>
> DataSet<T> records = ...
> DataSet<String> filterIDs = ...
>
> RecordsFilterer rf = new RecordsFilterer();
> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, filterIDs,
> "myField");
>
> Regarding the join code, I would suggest an optimization.
> Instead of using CoGroup, I would use distinct and an OuterJoin like this:
>
> DataSet<String> distIds = filtereredIds.distinct();
> DataSet<Tuple2<Boolean, T> result = records
>   .leftOuterJoin(distIds)
>   .where(KEYSELECTOR)
>   .equalTo("*") // use full string as key
>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>
> Best, Fabian
>
> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>:
>
>> Hi,
>>
>> I am getting NoSerializableException in this class-
>>
>> 
>>
>> public class RecordsFilterer<T extends GenericRecord> {
>>
>>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, 
>> DataSet<String> filteredIds, String fieldName) {
>>         return dataset.coGroup(filteredIds)
>>                 .where(new KeySelector<T, String>() {
>>                     @Override
>>                     public String getKey(T t) throws Exception {
>>                         String s = (String) t.get(fieldName);
>>                         return s != null ? s : UUID.randomUUID().toString();
>>                     }
>>                 })
>>                 .equalTo((KeySelector<String, String>) s -> s)
>>                 .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
>>                     @Override
>>                     public void coGroup(Iterable<T> records, 
>> Iterable<String> ids,
>>                                         Collector<Tuple2<Boolean,T>> 
>> collector) throws Exception {
>>                         boolean filterFlag = false;
>>                         for (String id : ids) {
>>                             filterFlag = true;
>>                         }
>>
>>                         for (T record : records) {
>>                             collector.collect(new Tuple2<>(filterFlag, 
>> record));
>>                         }
>>                     }
>>                 });
>>
>>     }
>> }
>>
>>
>> What I am trying to do is write a generic code that will join Avro
>> records (of different types) with String records and there is a match add a
>> filter flag. This way I can use the same code for different Avro record
>> types. But I am getting this exception-
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
>> grouped=null, unique=null] ]]': Could not write the user code wrapper class
>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>> java.io.NotSerializableException:
>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>>     at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>>     at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>>     at
>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>>     at
>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>     at
>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>     at
>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>     at
>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>>     at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>>     at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>>     at
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>>     at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>     at
>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>>     at
>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>     at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by:
>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>> Could not write the user code wrapper class
>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>> java.io.NotSerializableException: RecordsFilterer
>>     at
>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>>     at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
>>     at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
>>     ... 17 more
>> Caused by: java.io.NotSerializableException: RecordsFilterer
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>     at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>>     at
>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>>     at
>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>     ... 19 more
>>
>>
>> Please help me understand why I get this exception and how to fix it
>> [rewrite code may be?]
>>
>> Thanks,
>> Tarandeep
>>
>>
>>
>

Reply via email to