A master project about implementing Cypher on Apache Flink

2016-10-14 Thread Mengqi Yang
Hi all,

Let me introduce me first. My name is Mengqi Yang.  I was a master student
in Eindhoven University of Technology. Recently I have finished my master
project there (Vasia is also one of my supervisors, great thanks to her :)
).

The name of this project is  "a study of execution strategies for
openCypher on Apache Flink". In this project, I have implemented some
operators of Cypher (Cypher is a graph query language), mainly the core
operators, by using Flink batch API. One can easily build their own graph
queries by using these operators. Besides, two graph query optimizers are
also implemented for providing  different optimization strategies to the
graph queries. The whole project has been done by using Apache Flink, so I
would like to share my code and my thesis with the Flink community.

Please check the following link of my code. The thesis also has been
uploaded to the repository:
https://github.com/jiujieti/CypherImplementation
More details about this project could be found there.

If anyone is interested in this work or wants to continue this project or
has some questions, you can always contact me by my email
melody2014ymq@gmail.

Best regards,
Mengqi


Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2

2016-10-14 Thread Fabian Hueske
Yes, I'm also +1 for removing the methods at some point.

For 1.2 we could go ahead and move the Hadoop-MR connectors into a separate
module and mark the methods in ExecutionEnvironment as @deprecated.
In 1.3 (or 2.0 whatever comes next) we could make the switch.

2016-10-14 10:40 GMT+02:00 Stephan Ewen :

> @Fabian Good point. For Flink 2.0, I would suggest to remove them from the
> Environment and add them to a Utility. The way it is now, it ties Flink
> very strongly to Hadoop.
>
> You are right, before we do that, there is no way to make a Hadoop
> independent distribution.
>
> On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske  wrote:
>
> > +1 for dropping Hadoop1 support.
> >
> > Regarding a binary release without Hadoop:
> >
> > What would we do about the readHadoopFile() and createHadoopInput() on
> the
> > ExecutionEnvironment?
> > These methods are declared as @PublicEvolving, so we did not commit to
> keep
> > them.
> > However that does not necessarily mean we should easily break the API
> here
> > esp. since the methods have not been declared @deprecated.
> >
> > Best, Fabian
> >
> >
> >
> > 2016-10-14 10:29 GMT+02:00 Stephan Ewen :
> >
> > > @Greg
> > >
> > > I think that would be amazing. It does require a bit of cleanup,
> though.
> > As
> > > far as I know, the Hadoop dependency is additionally used for some
> > Kerberos
> > > utilities and for its S3 file system implementation.
> > > We would need to make the Kerberos part Hadoop independent and the
> > > FileSystem loading dynamic (with a good exception that the Hadoop
> > > dependency should be added if the filesystem cannot be loaded).
> > >
> > > Stephan
> > >
> > >
> > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan 
> wrote:
> > >
> > > > Okay, this sounds prudent. Would this be the right time to implement
> > > > FLINK-2268 "Provide Flink binary release without Hadoop"?
> > > >
> > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen 
> > wrote:
> > > >
> > > > > +1 for dropping Hadoop1 support
> > > > >
> > > > > @greg There is quite some complexity in the build setup and release
> > > > scripts
> > > > > and testing to support Hadoop 1. Also, we have to prepare to add
> > > support
> > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very
> > > tough.
> > > > >
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan 
> > > wrote:
> > > > >
> > > > > > Hi Robert,
> > > > > >
> > > > > > What are the benefits to Flink for dropping Hadoop 1 support? Is
> > > there
> > > > > > significant code cleanup or would we simply be publishing one
> less
> > > set
> > > > of
> > > > > > artifacts?
> > > > > >
> > > > > > Greg
> > > > > >
> > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger <
> > > rmetz...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > The Apache Hadoop community has recently released the first
> alpha
> > > > > version
> > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I
> think
> > > its
> > > > > > time
> > > > > > > to finally drop Hadoop 1 support in Flink.
> > > > > > >
> > > > > > > The last minor Hadoop 1 release was in 27 June, 2014.
> > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in
> > > July
> > > > > > 2016.
> > > > > > > Hadoop 2.2 was first released in October 2013, so there was
> > enough
> > > > time
> > > > > > > for users to upgrade.
> > > > > > >
> > > > > > > I added also the user@ list to the discussion to get opinions
> > > about
> > > > > this
> > > > > > > from there as well.
> > > > > > >
> > > > > > > Let me know what you think about this!
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > > Robert
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

2016-10-14 Thread Timo Walther

Hi everyone,

I think syntax in general is a question of taste, it will be hard to 
make everyone happy. On the one hand it would be great if Table API and 
SQL could look consistent, but on the other hand there are also some 
negative aspects:


SQL is a language that has not been developed for todays needs and 
Stream SQL will basically be a "hack" e.g. by using UDFs like TUMBLE, 
HOP etc. However, the Table API is a newly designed API and does not 
need the same hacky solutions.


The Table API should be a fluent API for both Scala and Java. If we are 
moving windows into the groupBy() call, the question is how this would 
look like:


.groupBy('col, tumble(12.hours, 'rowtime, 'alias)) OR .groupBy('col, 
Tumble over 12.hours on 'rowtime as 'alias)


In Java the window definitions would then be defined a string instead of 
method calls, so it is easier to for the user to make mistakes and there 
is no Javadoc with explanation.


I think we should decide whether a window is an operator or an 
expression. If it is an expression we can also allow window definition 
in .over() clauses. What do you think?


I support the idea of introducing partitionBy().

Regards,
Timo




Am 13/10/16 um 13:04 schrieb Zhangrucong:

Hi Fabian:
 What is the strategy for new syntax which calcite does not support? 
The calcite will support it? For example, the row window syntax.

Thank you very much!



-邮件原件-
发件人: Fabian Hueske [mailto:fhue...@gmail.com]
发送时间: 2016年10月13日 18:17
收件人: dev@flink.apache.org
抄送: Sean Wang; Timo Walther
主题: Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

Hi Zhangrucong,

yes, we want to use Calcite's SQL parser including its window syntax, i.e.,

- the standard SQL OVER windows (in streaming with a few restriction such as no 
different partitionings or orders)
- the GroupBy window functions (TUMBLE, HOP, SESSION).

The GroupBy window function are not implemented in Calcite yet. There is
CALCITE-1345 [1] to track the issue.

As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant.

Best, Fabian

[1] https://issues.apache.org/jira/browse/CALCITE-1345

2016-10-13 12:05 GMT+02:00 Fabian Hueske :


Hi everybody,

happy to see a good discussion here :-) I'll reply to Shaoxuan's mail
first and comment on Zhangrucong question in a separate mail.

Shaoxuan, thanks for the suggestions! I think we all agree that for
SQL we should definitely follow the standard (batch) SQL syntax.
In my opinion, the Table API does not necessarily have to be as close
as possible to SQL but should try to make a few things easier and also
safer (easier is of course subjective).

- GroupBy without windows: These are currently intentionally not
supported and also not part of FLIP-11. Our motivation for not
supporting this, is to guard the user from defining a query that fails
when being executed due to a very memory consuming operation. FLIP-11
provides a way to define such a query as a sliding row window with
unbounded preceding rows. With the upcoming SQL proposal, queries that
consume unbounded memory should be identified and rejected. I would be
in favor of allowing groupBy without windows once the guarding mechanism are in 
place.

- GroupBy with window: I think this is a question of taste. Having a
window() call, makes the feature more explicit in my opinion. However,
I'm not opposed to move the windows into the groupBy clause.
Implementation-wise it should be easy to move the window definition
into to groupBy clause for the Scala Table API. For the Java Table API
we would need to extend the parser quite a bit because windows would
need to be defined as Strings and not via objects.

- RowWindows: The rowWindow() call mimics the standard SQL WINDOW
clause (implemented by PostgreSQL and Calcite) which allows to have "reusable"
window definitions. I think this is a desirable feature. In the
FLIP-11 proposal the over() clause in select() refers to the
predefined windows with aliases. In case only one window is defined,
the over() clause is optional and the same (and only) window is
applied to all aggregates. I think we can make the over() call
mandatory to have the windowing more explicit. It should also be
possible to extend the over clause to directly accept RowWindows
instead of window aliases. I would not make this a priority at the
moment, but a feature that could be later added, because
rowWindow() and over() cover all cases. Similar as for GroupBy with
windows, we would need to extend the parser for the Java Table API though.

Finally, I have an own suggestion:
In FLIP-11, groupBy() is  used to define the partitioning of
RowWindows. I think this should be changed to partitionBy() because
groupBy() groups data and applies an aggregation to all rows of a
group which is not happening here. In original SQL, the OVER clause
features a PARTITION BY clause. We are moving this out of the window
definition, i.e., OVER clause, to enforce the same partitioning for

[DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Fabian Hueske
Hi everybody,

I would like to propose to deprecate the utility methods to read data with
Hadoop InputFormats from the (batch) ExecutionEnvironment.

The motivation for deprecating these methods is reduce Flink's dependency
on Hadoop but rather have Hadoop as an optional dependency for users that
actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
Flink distribution that does not have a hard Hadoop dependency.

One step for this is to remove the Hadoop dependency from flink-java
(Flink's Java DataSet API) which is currently required due to the above
utility methods (see FLINK-4315). We recently received a PR that addresses
FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
After some discussion, it was decided to defer the PR to Flink 2.0 because
it breaks the API (these methods are delared @PublicEvolving).

I propose to accept this PR for Flink 1.2, but instead of removing the
methods deprecating them.
This would help to migrate old code and prevent new usage of these methods.
For a later Flink release (1.3 or 2.0) we could remove these methods and
the Hadoop dependency on flink-java.

What do others think?

Best, Fabian


Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Greg Hogan
+1

On Fri, Oct 14, 2016 at 5:29 AM, Fabian Hueske  wrote:

> Hi everybody,
>
> I would like to propose to deprecate the utility methods to read data with
> Hadoop InputFormats from the (batch) ExecutionEnvironment.
>
> The motivation for deprecating these methods is reduce Flink's dependency
> on Hadoop but rather have Hadoop as an optional dependency for users that
> actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
> Flink distribution that does not have a hard Hadoop dependency.
>
> One step for this is to remove the Hadoop dependency from flink-java
> (Flink's Java DataSet API) which is currently required due to the above
> utility methods (see FLINK-4315). We recently received a PR that addresses
> FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
> After some discussion, it was decided to defer the PR to Flink 2.0 because
> it breaks the API (these methods are delared @PublicEvolving).
>
> I propose to accept this PR for Flink 1.2, but instead of removing the
> methods deprecating them.
> This would help to migrate old code and prevent new usage of these methods.
> For a later Flink release (1.3 or 2.0) we could remove these methods and
> the Hadoop dependency on flink-java.
>
> What do others think?
>
> Best, Fabian
>


Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

2016-10-14 Thread Sean Wang
Thanks for your quick reply, Fabian.

I have a few minor comments:


- Agree that we should consider GroupBy without window after the new SQL
proposal is settled down.


- For Java API, we can keep window() call, and put window alias into
Groupby clause. This can be also applied to rowwindow case.

 & 
-+1 to support replace groupby() by partitionby(). BTW, in the case of
over, instead of partitionby, are we going to support orderby? If yes, I
would suggest to define rowwindow as  rowwindow(PartionByParaType, OrderBy
ParaType, WindowParaType).

So
- moving windows into the groupBy() call :   +1
- making over() for rowWindow() with a single window definition.
- additionally allowing window definitions in over():  +1 yes for scala,
but use alias for java API.
- using partitionBy() instead of groupBy() for row windows?: +1, but better
to consider orderby, it may be even better to move partitionBy() into
rowwindow.

Regards,
Shaoxuan


On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske  wrote:

> Hi everybody,
>
> happy to see a good discussion here :-)
> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question
> in a separate mail.
>
> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we
> should definitely follow the standard (batch) SQL syntax.
> In my opinion, the Table API does not necessarily have to be as close as
> possible to SQL but should try to make a few things easier and also safer
> (easier is of course subjective).
>
> - GroupBy without windows: These are currently intentionally not supported
> and also not part of FLIP-11. Our motivation for not supporting this, is to
> guard the user from defining a query that fails when being executed due to
> a very memory consuming operation. FLIP-11 provides a way to define such a
> query as a sliding row window with unbounded preceding rows. With the
> upcoming SQL proposal, queries that consume unbounded memory should be
> identified and rejected. I would be in favor of allowing groupBy without
> windows once the guarding mechanism are in place.
>
> - GroupBy with window: I think this is a question of taste. Having a
> window() call, makes the feature more explicit in my opinion. However, I'm
> not opposed to move the windows into the groupBy clause.
> Implementation-wise it should be easy to move the window definition into to
> groupBy clause for the Scala Table API. For the Java Table API we would
> need to extend the parser quite a bit because windows would need to be
> defined as Strings and not via objects.
>
> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
> (implemented by PostgreSQL and Calcite) which allows to have "reusable"
> window definitions. I think this is a desirable feature. In the FLIP-11
> proposal the over() clause in select() refers to the predefined windows
> with aliases. In case only one window is defined, the over() clause is
> optional and the same (and only) window is applied to all aggregates. I
> think we can make the over() call mandatory to have the windowing more
> explicit. It should also be possible to extend the over clause to directly
> accept RowWindows instead of window aliases. I would not make this a
> priority at the moment, but a feature that could be later added, because
> rowWindow() and over() cover all cases. Similar as for GroupBy with
> windows, we would need to extend the parser for the Java Table API though.
>
> Finally, I have an own suggestion:
> In FLIP-11, groupBy() is  used to define the partitioning of RowWindows. I
> think this should be changed to partitionBy() because groupBy() groups data
> and applies an aggregation to all rows of a group which is not happening
> here. In original SQL, the OVER clause features a PARTITION BY clause. We
> are moving this out of the window definition, i.e., OVER clause, to enforce
> the same partitioning for all windows (different partitionings would be a
> challenge to execute in a parallel system).
>
> @Timo and all: What do you think about:
>
> - moving windows into the groupBy() call
> - making over() for rowWindow() with a single window definition
> - additionally allowing window definitions in over()
> - using partitionBy() instead of groupBy() for row windows?
>
> Best, Fabian
>
> 2016-10-13 11:10 GMT+02:00 Zhangrucong :
>
>> Hi shaoxuan:
>>
>> I think,  the streamsql must be excuted in table environment. So I call
>> this table API ‘s StreamSQL. What do you call for this, stream Table API or
>> streamsql? It is fu
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
>> .map(f=>(f, 1L, 1L))
>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>
>> So in my opinion, the grammar which is marked 

Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Robert Metzger
+1

On Fri, Oct 14, 2016 at 12:04 PM, Stephan Ewen  wrote:

> +1
>
> On Fri, Oct 14, 2016 at 11:54 AM, Greg Hogan  wrote:
>
> > +1
> >
> > On Fri, Oct 14, 2016 at 5:29 AM, Fabian Hueske 
> wrote:
> >
> > > Hi everybody,
> > >
> > > I would like to propose to deprecate the utility methods to read data
> > with
> > > Hadoop InputFormats from the (batch) ExecutionEnvironment.
> > >
> > > The motivation for deprecating these methods is reduce Flink's
> dependency
> > > on Hadoop but rather have Hadoop as an optional dependency for users
> that
> > > actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to
> have
> > > Flink distribution that does not have a hard Hadoop dependency.
> > >
> > > One step for this is to remove the Hadoop dependency from flink-java
> > > (Flink's Java DataSet API) which is currently required due to the above
> > > utility methods (see FLINK-4315). We recently received a PR that
> > addresses
> > > FLINK-4315 and removes the Hadoop methods from the
> ExecutionEnvironment.
> > > After some discussion, it was decided to defer the PR to Flink 2.0
> > because
> > > it breaks the API (these methods are delared @PublicEvolving).
> > >
> > > I propose to accept this PR for Flink 1.2, but instead of removing the
> > > methods deprecating them.
> > > This would help to migrate old code and prevent new usage of these
> > methods.
> > > For a later Flink release (1.3 or 2.0) we could remove these methods
> and
> > > the Hadoop dependency on flink-java.
> > >
> > > What do others think?
> > >
> > > Best, Fabian
> > >
> >
>


Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2

2016-10-14 Thread Stephan Ewen
@Greg

I think that would be amazing. It does require a bit of cleanup, though. As
far as I know, the Hadoop dependency is additionally used for some Kerberos
utilities and for its S3 file system implementation.
We would need to make the Kerberos part Hadoop independent and the
FileSystem loading dynamic (with a good exception that the Hadoop
dependency should be added if the filesystem cannot be loaded).

Stephan


On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan  wrote:

> Okay, this sounds prudent. Would this be the right time to implement
> FLINK-2268 "Provide Flink binary release without Hadoop"?
>
> On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen  wrote:
>
> > +1 for dropping Hadoop1 support
> >
> > @greg There is quite some complexity in the build setup and release
> scripts
> > and testing to support Hadoop 1. Also, we have to prepare to add support
> > for Hadoop 3, and then supporting in addition Hadoop 1 seems very tough.
> >
> > Stephan
> >
> >
> > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan  wrote:
> >
> > > Hi Robert,
> > >
> > > What are the benefits to Flink for dropping Hadoop 1 support? Is there
> > > significant code cleanup or would we simply be publishing one less set
> of
> > > artifacts?
> > >
> > > Greg
> > >
> > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > The Apache Hadoop community has recently released the first alpha
> > version
> > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I think its
> > > time
> > > > to finally drop Hadoop 1 support in Flink.
> > > >
> > > > The last minor Hadoop 1 release was in 27 June, 2014.
> > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in July
> > > 2016.
> > > > Hadoop 2.2 was first released in October 2013, so there was enough
> time
> > > > for users to upgrade.
> > > >
> > > > I added also the user@ list to the discussion to get opinions about
> > this
> > > > from there as well.
> > > >
> > > > Let me know what you think about this!
> > > >
> > > >
> > > > Regards,
> > > > Robert
> > > >
> > >
> >
>


Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2

2016-10-14 Thread Stephan Ewen
@Fabian - Someone started with that in
https://issues.apache.org/jira/browse/FLINK-4315
That could be changed to not remove the methods from the
ExecutionEnvironment.

On Fri, Oct 14, 2016 at 10:45 AM, Fabian Hueske  wrote:

> Yes, I'm also +1 for removing the methods at some point.
>
> For 1.2 we could go ahead and move the Hadoop-MR connectors into a separate
> module and mark the methods in ExecutionEnvironment as @deprecated.
> In 1.3 (or 2.0 whatever comes next) we could make the switch.
>
> 2016-10-14 10:40 GMT+02:00 Stephan Ewen :
>
> > @Fabian Good point. For Flink 2.0, I would suggest to remove them from
> the
> > Environment and add them to a Utility. The way it is now, it ties Flink
> > very strongly to Hadoop.
> >
> > You are right, before we do that, there is no way to make a Hadoop
> > independent distribution.
> >
> > On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske 
> wrote:
> >
> > > +1 for dropping Hadoop1 support.
> > >
> > > Regarding a binary release without Hadoop:
> > >
> > > What would we do about the readHadoopFile() and createHadoopInput() on
> > the
> > > ExecutionEnvironment?
> > > These methods are declared as @PublicEvolving, so we did not commit to
> > keep
> > > them.
> > > However that does not necessarily mean we should easily break the API
> > here
> > > esp. since the methods have not been declared @deprecated.
> > >
> > > Best, Fabian
> > >
> > >
> > >
> > > 2016-10-14 10:29 GMT+02:00 Stephan Ewen :
> > >
> > > > @Greg
> > > >
> > > > I think that would be amazing. It does require a bit of cleanup,
> > though.
> > > As
> > > > far as I know, the Hadoop dependency is additionally used for some
> > > Kerberos
> > > > utilities and for its S3 file system implementation.
> > > > We would need to make the Kerberos part Hadoop independent and the
> > > > FileSystem loading dynamic (with a good exception that the Hadoop
> > > > dependency should be added if the filesystem cannot be loaded).
> > > >
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan 
> > wrote:
> > > >
> > > > > Okay, this sounds prudent. Would this be the right time to
> implement
> > > > > FLINK-2268 "Provide Flink binary release without Hadoop"?
> > > > >
> > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen 
> > > wrote:
> > > > >
> > > > > > +1 for dropping Hadoop1 support
> > > > > >
> > > > > > @greg There is quite some complexity in the build setup and
> release
> > > > > scripts
> > > > > > and testing to support Hadoop 1. Also, we have to prepare to add
> > > > support
> > > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very
> > > > tough.
> > > > > >
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan 
> > > > wrote:
> > > > > >
> > > > > > > Hi Robert,
> > > > > > >
> > > > > > > What are the benefits to Flink for dropping Hadoop 1 support?
> Is
> > > > there
> > > > > > > significant code cleanup or would we simply be publishing one
> > less
> > > > set
> > > > > of
> > > > > > > artifacts?
> > > > > > >
> > > > > > > Greg
> > > > > > >
> > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger <
> > > > rmetz...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > The Apache Hadoop community has recently released the first
> > alpha
> > > > > > version
> > > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I
> > think
> > > > its
> > > > > > > time
> > > > > > > > to finally drop Hadoop 1 support in Flink.
> > > > > > > >
> > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014.
> > > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release
> in
> > > > July
> > > > > > > 2016.
> > > > > > > > Hadoop 2.2 was first released in October 2013, so there was
> > > enough
> > > > > time
> > > > > > > > for users to upgrade.
> > > > > > > >
> > > > > > > > I added also the user@ list to the discussion to get
> opinions
> > > > about
> > > > > > this
> > > > > > > > from there as well.
> > > > > > > >
> > > > > > > > Let me know what you think about this!
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Robert
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-4829) Accumulators are not thread safe

2016-10-14 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4829:


 Summary: Accumulators are not thread safe
 Key: FLINK-4829
 URL: https://issues.apache.org/jira/browse/FLINK-4829
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.2.0
Reporter: Till Rohrmann
 Fix For: 1.2.0


Flink's {{Accumulators}} are not thread safe. With the introduction of live 
accumulator snapshots which are sent to the {{JobManager}}, we've introduced a 
concurrent access to accumulators without properly guard them against 
concurrent modifications. So if an accumulator snapshot is taken for an 
accumulator which is at the same time modified, it can cause an 
{{ConcurrentModificationException}} as it was reported by an user: 

{code}
WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to 
serialize accumulators for task.
java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
at java.util.TreeMap.writeObject(TreeMap.java:2436)
at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
at java.util.HashMap.writeObject(HashMap.java:1362)
at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
at 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
at 
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2

2016-10-14 Thread Stephan Ewen
@Fabian Good point. For Flink 2.0, I would suggest to remove them from the
Environment and add them to a Utility. The way it is now, it ties Flink
very strongly to Hadoop.

You are right, before we do that, there is no way to make a Hadoop
independent distribution.

On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske  wrote:

> +1 for dropping Hadoop1 support.
>
> Regarding a binary release without Hadoop:
>
> What would we do about the readHadoopFile() and createHadoopInput() on the
> ExecutionEnvironment?
> These methods are declared as @PublicEvolving, so we did not commit to keep
> them.
> However that does not necessarily mean we should easily break the API here
> esp. since the methods have not been declared @deprecated.
>
> Best, Fabian
>
>
>
> 2016-10-14 10:29 GMT+02:00 Stephan Ewen :
>
> > @Greg
> >
> > I think that would be amazing. It does require a bit of cleanup, though.
> As
> > far as I know, the Hadoop dependency is additionally used for some
> Kerberos
> > utilities and for its S3 file system implementation.
> > We would need to make the Kerberos part Hadoop independent and the
> > FileSystem loading dynamic (with a good exception that the Hadoop
> > dependency should be added if the filesystem cannot be loaded).
> >
> > Stephan
> >
> >
> > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan  wrote:
> >
> > > Okay, this sounds prudent. Would this be the right time to implement
> > > FLINK-2268 "Provide Flink binary release without Hadoop"?
> > >
> > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen 
> wrote:
> > >
> > > > +1 for dropping Hadoop1 support
> > > >
> > > > @greg There is quite some complexity in the build setup and release
> > > scripts
> > > > and testing to support Hadoop 1. Also, we have to prepare to add
> > support
> > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very
> > tough.
> > > >
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan 
> > wrote:
> > > >
> > > > > Hi Robert,
> > > > >
> > > > > What are the benefits to Flink for dropping Hadoop 1 support? Is
> > there
> > > > > significant code cleanup or would we simply be publishing one less
> > set
> > > of
> > > > > artifacts?
> > > > >
> > > > > Greg
> > > > >
> > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger <
> > rmetz...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > The Apache Hadoop community has recently released the first alpha
> > > > version
> > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I think
> > its
> > > > > time
> > > > > > to finally drop Hadoop 1 support in Flink.
> > > > > >
> > > > > > The last minor Hadoop 1 release was in 27 June, 2014.
> > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in
> > July
> > > > > 2016.
> > > > > > Hadoop 2.2 was first released in October 2013, so there was
> enough
> > > time
> > > > > > for users to upgrade.
> > > > > >
> > > > > > I added also the user@ list to the discussion to get opinions
> > about
> > > > this
> > > > > > from there as well.
> > > > > >
> > > > > > Let me know what you think about this!
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Robert
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2

2016-10-14 Thread Fabian Hueske
Thanks for the pointer.
I'll start a separate discussion and push the PR forward if we come to an
agreement.

2016-10-14 11:04 GMT+02:00 Stephan Ewen :

> @Fabian - Someone started with that in
> https://issues.apache.org/jira/browse/FLINK-4315
> That could be changed to not remove the methods from the
> ExecutionEnvironment.
>
> On Fri, Oct 14, 2016 at 10:45 AM, Fabian Hueske  wrote:
>
> > Yes, I'm also +1 for removing the methods at some point.
> >
> > For 1.2 we could go ahead and move the Hadoop-MR connectors into a
> separate
> > module and mark the methods in ExecutionEnvironment as @deprecated.
> > In 1.3 (or 2.0 whatever comes next) we could make the switch.
> >
> > 2016-10-14 10:40 GMT+02:00 Stephan Ewen :
> >
> > > @Fabian Good point. For Flink 2.0, I would suggest to remove them from
> > the
> > > Environment and add them to a Utility. The way it is now, it ties Flink
> > > very strongly to Hadoop.
> > >
> > > You are right, before we do that, there is no way to make a Hadoop
> > > independent distribution.
> > >
> > > On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske 
> > wrote:
> > >
> > > > +1 for dropping Hadoop1 support.
> > > >
> > > > Regarding a binary release without Hadoop:
> > > >
> > > > What would we do about the readHadoopFile() and createHadoopInput()
> on
> > > the
> > > > ExecutionEnvironment?
> > > > These methods are declared as @PublicEvolving, so we did not commit
> to
> > > keep
> > > > them.
> > > > However that does not necessarily mean we should easily break the API
> > > here
> > > > esp. since the methods have not been declared @deprecated.
> > > >
> > > > Best, Fabian
> > > >
> > > >
> > > >
> > > > 2016-10-14 10:29 GMT+02:00 Stephan Ewen :
> > > >
> > > > > @Greg
> > > > >
> > > > > I think that would be amazing. It does require a bit of cleanup,
> > > though.
> > > > As
> > > > > far as I know, the Hadoop dependency is additionally used for some
> > > > Kerberos
> > > > > utilities and for its S3 file system implementation.
> > > > > We would need to make the Kerberos part Hadoop independent and the
> > > > > FileSystem loading dynamic (with a good exception that the Hadoop
> > > > > dependency should be added if the filesystem cannot be loaded).
> > > > >
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan 
> > > wrote:
> > > > >
> > > > > > Okay, this sounds prudent. Would this be the right time to
> > implement
> > > > > > FLINK-2268 "Provide Flink binary release without Hadoop"?
> > > > > >
> > > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen  >
> > > > wrote:
> > > > > >
> > > > > > > +1 for dropping Hadoop1 support
> > > > > > >
> > > > > > > @greg There is quite some complexity in the build setup and
> > release
> > > > > > scripts
> > > > > > > and testing to support Hadoop 1. Also, we have to prepare to
> add
> > > > > support
> > > > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems
> very
> > > > > tough.
> > > > > > >
> > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan <
> c...@greghogan.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Robert,
> > > > > > > >
> > > > > > > > What are the benefits to Flink for dropping Hadoop 1 support?
> > Is
> > > > > there
> > > > > > > > significant code cleanup or would we simply be publishing one
> > > less
> > > > > set
> > > > > > of
> > > > > > > > artifacts?
> > > > > > > >
> > > > > > > > Greg
> > > > > > > >
> > > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger <
> > > > > rmetz...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > The Apache Hadoop community has recently released the first
> > > alpha
> > > > > > > version
> > > > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I
> > > think
> > > > > its
> > > > > > > > time
> > > > > > > > > to finally drop Hadoop 1 support in Flink.
> > > > > > > > >
> > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014.
> > > > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0
> release
> > in
> > > > > July
> > > > > > > > 2016.
> > > > > > > > > Hadoop 2.2 was first released in October 2013, so there was
> > > > enough
> > > > > > time
> > > > > > > > > for users to upgrade.
> > > > > > > > >
> > > > > > > > > I added also the user@ list to the discussion to get
> > opinions
> > > > > about
> > > > > > > this
> > > > > > > > > from there as well.
> > > > > > > > >
> > > > > > > > > Let me know what you think about this!
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Robert
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2

2016-10-14 Thread Fabian Hueske
+1 for dropping Hadoop1 support.

Regarding a binary release without Hadoop:

What would we do about the readHadoopFile() and createHadoopInput() on the
ExecutionEnvironment?
These methods are declared as @PublicEvolving, so we did not commit to keep
them.
However that does not necessarily mean we should easily break the API here
esp. since the methods have not been declared @deprecated.

Best, Fabian



2016-10-14 10:29 GMT+02:00 Stephan Ewen :

> @Greg
>
> I think that would be amazing. It does require a bit of cleanup, though. As
> far as I know, the Hadoop dependency is additionally used for some Kerberos
> utilities and for its S3 file system implementation.
> We would need to make the Kerberos part Hadoop independent and the
> FileSystem loading dynamic (with a good exception that the Hadoop
> dependency should be added if the filesystem cannot be loaded).
>
> Stephan
>
>
> On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan  wrote:
>
> > Okay, this sounds prudent. Would this be the right time to implement
> > FLINK-2268 "Provide Flink binary release without Hadoop"?
> >
> > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen  wrote:
> >
> > > +1 for dropping Hadoop1 support
> > >
> > > @greg There is quite some complexity in the build setup and release
> > scripts
> > > and testing to support Hadoop 1. Also, we have to prepare to add
> support
> > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very
> tough.
> > >
> > > Stephan
> > >
> > >
> > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan 
> wrote:
> > >
> > > > Hi Robert,
> > > >
> > > > What are the benefits to Flink for dropping Hadoop 1 support? Is
> there
> > > > significant code cleanup or would we simply be publishing one less
> set
> > of
> > > > artifacts?
> > > >
> > > > Greg
> > > >
> > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger <
> rmetz...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > The Apache Hadoop community has recently released the first alpha
> > > version
> > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I think
> its
> > > > time
> > > > > to finally drop Hadoop 1 support in Flink.
> > > > >
> > > > > The last minor Hadoop 1 release was in 27 June, 2014.
> > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in
> July
> > > > 2016.
> > > > > Hadoop 2.2 was first released in October 2013, so there was enough
> > time
> > > > > for users to upgrade.
> > > > >
> > > > > I added also the user@ list to the discussion to get opinions
> about
> > > this
> > > > > from there as well.
> > > > >
> > > > > Let me know what you think about this!
> > > > >
> > > > >
> > > > > Regards,
> > > > > Robert
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-4828) execute stream job asynchronously

2016-10-14 Thread David Anderson (JIRA)
David Anderson created FLINK-4828:
-

 Summary: execute stream job asynchronously
 Key: FLINK-4828
 URL: https://issues.apache.org/jira/browse/FLINK-4828
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Reporter: David Anderson


It is currently awkward to work with datastreams in a notebook (e.g. Zeppelin 
or Jupyter) because env.execute() never returns, and there is no way to control 
the job that has been started. It would be much better to have a variant like 
env.executeAsync() that executes the job in another thread, and returns right 
away. This method should return an object that can be used to cancel or stop 
the job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Stephan Ewen
+1

On Fri, Oct 14, 2016 at 11:54 AM, Greg Hogan  wrote:

> +1
>
> On Fri, Oct 14, 2016 at 5:29 AM, Fabian Hueske  wrote:
>
> > Hi everybody,
> >
> > I would like to propose to deprecate the utility methods to read data
> with
> > Hadoop InputFormats from the (batch) ExecutionEnvironment.
> >
> > The motivation for deprecating these methods is reduce Flink's dependency
> > on Hadoop but rather have Hadoop as an optional dependency for users that
> > actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
> > Flink distribution that does not have a hard Hadoop dependency.
> >
> > One step for this is to remove the Hadoop dependency from flink-java
> > (Flink's Java DataSet API) which is currently required due to the above
> > utility methods (see FLINK-4315). We recently received a PR that
> addresses
> > FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
> > After some discussion, it was decided to defer the PR to Flink 2.0
> because
> > it breaks the API (these methods are delared @PublicEvolving).
> >
> > I propose to accept this PR for Flink 1.2, but instead of removing the
> > methods deprecating them.
> > This would help to migrate old code and prevent new usage of these
> methods.
> > For a later Flink release (1.3 or 2.0) we could remove these methods and
> > the Hadoop dependency on flink-java.
> >
> > What do others think?
> >
> > Best, Fabian
> >
>


[jira] [Created] (FLINK-4831) IMplement a log4j metric reporter

2016-10-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4831:
---

 Summary: IMplement a log4j metric reporter
 Key: FLINK-4831
 URL: https://issues.apache.org/jira/browse/FLINK-4831
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.1.2
Reporter: Chesnay Schepler
 Fix For: 1.2.0


For debugging purpose it would be very useful to have a log4j metric reporter. 
If you don't want to setup a metric backend you currently have to rely on JMX, 
which a) works a bit differently than other reporters (for example it doesn't 
extend AbstractReporter) and b) makes it a bit tricky to analyze results as 
metrics are cleaned up once a job finishes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4830) Show latency statistics in web interface

2016-10-14 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4830:
-

 Summary: Show latency statistics in web interface
 Key: FLINK-4830
 URL: https://issues.apache.org/jira/browse/FLINK-4830
 Project: Flink
  Issue Type: Sub-task
  Components: Metrics
Reporter: Robert Metzger
Assignee: Robert Metzger


With FLINK-3660, we added a metric that measures the latency of records flowing 
through the system.

With this JIRA, I would like to expose the latency also in the web frontend.
Therefore, we'll probably need to change the format the latency is reported in 
the metric.

I think we should show the latencies in two different views in the interface:
- A global end-to-end latency in the overview of running jobs. This number is 
probably pretty inaccurate, but should give users a first impression of the 
latency characteristics currently present in the job
- A detailed latency drill-down view, that allows to see how much latency is 
added on average at each operator, for the sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4832) Count/Sum 0 elements

2016-10-14 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4832:
---

 Summary: Count/Sum 0 elements
 Key: FLINK-4832
 URL: https://issues.apache.org/jira/browse/FLINK-4832
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


Currently, the Table API is unable to count or sum up 0 elements. We should 
improve DataSet aggregations for this. Maybe by union the original DataSet with 
a dummy record or by using a MapPartition function. Coming up with a good 
design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Exception from in-progress implementation of Python API bulk iterations

2016-10-14 Thread Chesnay Schepler
In this branch: https://github.com/zentol/flink/tree/new-iterations you 
can find a more fine-grained fix for chaining with

iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695

On 13.10.2016 23:11, Chesnay Schepler wrote:
The chaining code is definitely related, I also have a pretty clear 
idea how to fix it.


The odd thing is that the Java API doesn't catch this type mismatch; 
the date types are
known when the plan is generated. This kind of error shouldn't even 
happen.


On 13.10.2016 21:15, Geoffrey Mon wrote:

Thank you very much. Disabling chaining with the Python API allows my
actual script to run properly. The division by zero must be an issue 
with

the job that I posted on gist.

Does that mean that the issue must be in the chaining part of the API?
Chaining from the way I understand it is an important optimization that
would be important for the performance comparison I wish to make in my
project.

Cheers,
Geoffrey

On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler  
wrote:



A temporary work around appears to be disabling chaining, which you can
do by commenting out L215 "self._find_chains()" in Environment.py.
Note that you then run into a division by zero error, but i can't tell
whether that is a problem of the job or not.

On 13.10.2016 13:41, Chesnay Schepler wrote:

Hey Geoffrey,

I was able to reproduce the error and will look into it in more detail
tomorrow.

Regards,
Chesnay

On 12.10.2016 23:09, Geoffrey Mon wrote:

Hello,

Has anyone had a chance to look into this? I am currently working 
on the

problem but I have minimal understanding of how the internal Flink
Python
API works; any expertise would be greatly appreciated.

Thank you very much!

Geoffrey

On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon  
wrote:



Hi Chesnay,

Heh, I have discovered that if I do not restart Flink after 
running my

original problematic script, then similar issues will manifest
themselves
in other otherwise working scripts. I haven't been able to 
completely

narrow down the problem, but I promise this new script will have a
ClassCastException that is completely reproducible. :)
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

Thanks,
Geoffrey

On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler 


wrote:

Hello Geoffrey,

this one works for me as well :D

Regards,
Chesnay

On 28.09.2016 05:38, Geoffrey Mon wrote:

Hello Chesnay,

Thank you for your help. After receiving your message I 
recompiled my

version of Flink completely, and both the NullPointerException
listed in
the TODO and the ClassCastException with the join operation went 
away.
Previously, I had been only recompiling the modules of Flink 
that had

been

changed to save time using "mvn clean install -pl :module" and
apparently
that may have been causing some of my issues.

Now, the problem is more clear: when a specific group reduce
function in

my

research project plan file is used within an iteration, I get a
ClassCastException exception:
Caused by: java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
at

org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) 


at
org.apache.flink.runtime.iterative.io
.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) 


at

org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 


at

org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) 


at

org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) 


at

org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) 


at

org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) 


at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) 


at

org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) 


at

org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) 


at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) 


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
at java.lang.Thread.run(Thread.java:745)

I'm not sure why this is causing an exception, and I would greatly
appreciate any assistance. I've revised the barebones error-causing
plan
file to focus on this new error source:
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
The group reduce function in question seems to work just fine
outside of
iterations. I have organized the commits and pushed to a new 
branch to

make

it easier to test and hopefully review soon:
https://github.com/GEOFBOT/flink/tree/new-iterations

Cheers,
Geoffrey

On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler 


wrote:

Hello