Hi,
yes, I was talking about a Flink bug. I forgot to mention the work-around
that Stephan mentioned.

On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <se...@apache.org> wrote:

> You can also make the KeySelector a static inner class. That should work
> as well.
>
> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <tarand...@gmail.com>
> wrote:
>
>> Thank you Aljoscha and Fabian for your replies.
>>
>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
>> afraid this is a bug", I am assuming you are referring to Flink engine
>> itself.
>>
>> @Fabian: thanks for the optimization tip.
>>
>> This is how I have got it working (with a hack): In my dataset, the join
>> field/key can be null otherwise .where(fieldName) works and I don't get
>> not-serializable exception. So I applied a MapFunction to DataSet and put a
>> dummy value in the join field/key where it was null. Then In the join
>> function, I change it back to null.
>>
>> Best,
>> Tarandeep
>>
>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> the problem is that the KeySelector is an anonymous inner class and as
>>> such as a reference to the outer RecordFilterer object. Normally, this
>>> would be rectified by the closure cleaner but the cleaner is not used in
>>> CoGroup.where(). I'm afraid this is a bug.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Tarandeep,
>>>>
>>>> the exception suggests that Flink tries to serialize RecordsFilterer as
>>>> a user function (this happens via Java Serialization).
>>>> I said suggests because the code that uses RecordsFilterer is not
>>>> included.
>>>>
>>>> To me it looks like RecordsFilterer should not be used as a user
>>>> function. It is a helper class to construct a DataSet program, so it should
>>>> not be shipped for execution.
>>>> You would use such a class as follows:
>>>>
>>>> DataSet<T> records = ...
>>>> DataSet<String> filterIDs = ...
>>>>
>>>> RecordsFilterer rf = new RecordsFilterer();
>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records,
>>>> filterIDs, "myField");
>>>>
>>>> Regarding the join code, I would suggest an optimization.
>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like
>>>> this:
>>>>
>>>> DataSet<String> distIds = filtereredIds.distinct();
>>>> DataSet<Tuple2<Boolean, T> result = records
>>>>   .leftOuterJoin(distIds)
>>>>   .where(KEYSELECTOR)
>>>>   .equalTo("*") // use full string as key
>>>>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am getting NoSerializableException in this class-
>>>>>
>>>>> 
>>>>>
>>>>> public class RecordsFilterer<T extends GenericRecord> {
>>>>>
>>>>>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, 
>>>>> DataSet<String> filteredIds, String fieldName) {
>>>>>         return dataset.coGroup(filteredIds)
>>>>>                 .where(new KeySelector<T, String>() {
>>>>>                     @Override
>>>>>                     public String getKey(T t) throws Exception {
>>>>>                         String s = (String) t.get(fieldName);
>>>>>                         return s != null ? s : 
>>>>> UUID.randomUUID().toString();
>>>>>                     }
>>>>>                 })
>>>>>                 .equalTo((KeySelector<String, String>) s -> s)
>>>>>                 .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() 
>>>>> {
>>>>>                     @Override
>>>>>                     public void coGroup(Iterable<T> records, 
>>>>> Iterable<String> ids,
>>>>>                                         Collector<Tuple2<Boolean,T>> 
>>>>> collector) throws Exception {
>>>>>                         boolean filterFlag = false;
>>>>>                         for (String id : ids) {
>>>>>                             filterFlag = true;
>>>>>                         }
>>>>>
>>>>>                         for (T record : records) {
>>>>>                             collector.collect(new Tuple2<>(filterFlag, 
>>>>> record));
>>>>>                         }
>>>>>                     }
>>>>>                 });
>>>>>
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> What I am trying to do is write a generic code that will join Avro
>>>>> records (of different types) with String records and there is a match add 
>>>>> a
>>>>> filter flag. This way I can use the same code for different Avro record
>>>>> types. But I am getting this exception-
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map
>>>>> "Key Extractor" : MAP [[ GlobalProperties 
>>>>> [partitioning=RANDOM_PARTITIONED]
>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': 
>>>>> Could
>>>>> not write the user code wrapper class
>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>>>> java.io.NotSerializableException:
>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>>>>>     at
>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>>>>>     at
>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>>>>>     at
>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>>>>>     at
>>>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>>>>     at
>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>     at
>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>     at
>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>>>>>     at
>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>>>>>     at
>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>>>>>     at
>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>>>>>     at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>>     at
>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>>>>>     at
>>>>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>     at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>     at
>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>>> Caused by:
>>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>>>>> Could not write the user code wrapper class
>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>>>> java.io.NotSerializableException: RecordsFilterer
>>>>>     at
>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
>>>>>     at
>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
>>>>>     at
>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
>>>>>     ... 17 more
>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>     at
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>     at
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>     at
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>>>>>     at
>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>>>>     ... 19 more
>>>>>
>>>>>
>>>>> Please help me understand why I get this exception and how to fix it
>>>>> [rewrite code may be?]
>>>>>
>>>>> Thanks,
>>>>> Tarandeep
>>>>>
>>>>>
>>>>>
>>>>
>>
>

Reply via email to