Re: Exception from in-progress implementation of Python API bulk iterations

2016-10-08 Thread Geoffrey Mon
Hi Chesnay,

Heh, I have discovered that if I do not restart Flink after running my
original problematic script, then similar issues will manifest themselves
in other otherwise working scripts. I haven't been able to completely
narrow down the problem, but I promise this new script will have a
ClassCastException that is completely reproducible. :)
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

Thanks,
Geoffrey

On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler  wrote:

> Hello Geoffrey,
>
> this one works for me as well :D
>
> Regards,
> Chesnay
>
> On 28.09.2016 05:38, Geoffrey Mon wrote:
> > Hello Chesnay,
> >
> > Thank you for your help. After receiving your message I recompiled my
> > version of Flink completely, and both the NullPointerException listed in
> > the TODO and the ClassCastException with the join operation went away.
> > Previously, I had been only recompiling the modules of Flink that had
> been
> > changed to save time using "mvn clean install -pl :module" and apparently
> > that may have been causing some of my issues.
> >
> > Now, the problem is more clear: when a specific group reduce function in
> my
> > research project plan file is used within an iteration, I get a
> > ClassCastException exception:
> > Caused by: java.lang.ClassCastException:
> > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> > at
> >
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> > at
> > org.apache.flink.runtime.iterative.io
> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> > at
> >
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> > at
> >
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> > at
> >
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> > at
> >
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> > at
> >
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at
> >
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> > at
> >
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> > at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > I'm not sure why this is causing an exception, and I would greatly
> > appreciate any assistance. I've revised the barebones error-causing plan
> > file to focus on this new error source:
> > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> > The group reduce function in question seems to work just fine outside of
> > iterations. I have organized the commits and pushed to a new branch to
> make
> > it easier to test and hopefully review soon:
> > https://github.com/GEOFBOT/flink/tree/new-iterations
> >
> > Cheers,
> > Geoffrey
> >
> > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler 
> wrote:
> >
> >> Hello Geoffrey,
> >>
> >> i could not reproduce this issue with the commits and plan you provided.
> >>
> >> I tried out both the FLINK-4098 and bulk-iterations branches (and
> >> reverted back to the specified commits) and built Flink from scratch.
> >>
> >> Could you double check that the code you provided produces the error?
> >> Also, which OS/python version are you using?
> >>
> >> Regards,
> >> Chesnay
> >>
> >> On 20.09.2016 11:13, Chesnay Schepler wrote:
> >>> Hello,
> >>>
> >>> I'll try to take a look this week.
> >>>
> >>> Regards,
> >>> Chesnay
> >>>
> >>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>  Hello all,
> 
>  I have recently been working on adding bulk iterations to the Python
>  API of
>  Flink in order to facilitate a research project I am working on. The
>  current changes can be seen in this GitHub diff:
> 
> >>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> 
>  This implementation seems to work for, at least, simple examples,
>  such as
>  incrementing numbers in a data set. However, with the transformations
>  required for my project, I get an exception
>  "java.lang.ClassCastException:
>  [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>  from the
>  deserializers called by
> 
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>  I've created the following simplified Python plan by stripping down my
>  research project code to the problem-causing parts:
>  https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

Re: Allow TypeInfofactory regisration via ExecutionConfig

2016-10-08 Thread Alexander Alexandrov
To follow up on my on post,

I think a straight-forward approach would be to make the
`TypeExtractor.registerFactory` method public [1].

BTW, both the TypeInfo annotation [2] and TypeInfoFactory [3] itself refer
to this method in the JavaDoc. However, besides these mentions I was not
able to find any actual usage of `registerFactory` in the Flink codebase,
so I am not sure when and where the method is actually invoked to register
the annotated classes.

[1]
https://github.com/apache/flink/blob/e5d62da2c98ad9c6a5ca9c0782a7fea8a01d639a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java#L124-L144
[2]
https://github.com/apache/flink/blob/e5d62da2c98ad9c6a5ca9c0782a7fea8a01d639a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
[2]
https://github.com/apache/flink/blob/e5d62da2c98ad9c6a5ca9c0782a7fea8a01d639a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java

On Sat, Oct 8, 2016 at 4:00 PM Alexander Alexandrov <
alexander.s.alexand...@gmail.com> wrote:

I wanted to open this directly as a JIRA to follow-up on FLINK-3042,
however my account (aalexandrov) does not seem to have the necessary
privileges, so I will post this to the dev list instead.

The current approach for registration of custom `TypeInformation`
implementations which relies exclusively on annotations. Consequently, it
will be very tedious to create a custom `TypeInformation` for types that
are pulled and used within a project from an external library.

For example, if  I have a project structure such as

```
my-project  // uses FancyContainer
 `- flink-core
 `- ext-library // defines FancyContainer
```

I will not be able to modify `FancyContainer.java`.

A workaround that might work (I have not tested it yet) is to use
`javassist` to attach the annotation prior to job assembly, but IMHO a much
clearer solution would be to extend the `ExecutionEnvironment` with
something like

```java
env.getConfig().registerTypeInfoFactory(Class type, Class> factory)
```

Regards,
Alexander


Allow TypeInfofactory regisration via ExecutionConfig

2016-10-08 Thread Alexander Alexandrov
I wanted to open this directly as a JIRA to follow-up on FLINK-3042,
however my account (aalexandrov) does not seem to have the necessary
privileges, so I will post this to the dev list instead.

The current approach for registration of custom `TypeInformation`
implementations which relies exclusively on annotations. Consequently, it
will be very tedious to create a custom `TypeInformation` for types that
are pulled and used within a project from an external library.

For example, if  I have a project structure such as

```
my-project  // uses FancyContainer
 `- flink-core
 `- ext-library // defines FancyContainer
```

I will not be able to modify `FancyContainer.java`.

A workaround that might work (I have not tested it yet) is to use
`javassist` to attach the annotation prior to job assembly, but IMHO a much
clearer solution would be to extend the `ExecutionEnvironment` with
something like

```java
env.getConfig().registerTypeInfoFactory(Class type, Class> factory)
```

Regards,
Alexander