A master project about implementing Cypher on Apache Flink
Hi all, Let me introduce me first. My name is Mengqi Yang. I was a master student in Eindhoven University of Technology. Recently I have finished my master project there (Vasia is also one of my supervisors, great thanks to her :) ). The name of this project is "a study of execution strategies for openCypher on Apache Flink". In this project, I have implemented some operators of Cypher (Cypher is a graph query language), mainly the core operators, by using Flink batch API. One can easily build their own graph queries by using these operators. Besides, two graph query optimizers are also implemented for providing different optimization strategies to the graph queries. The whole project has been done by using Apache Flink, so I would like to share my code and my thesis with the Flink community. Please check the following link of my code. The thesis also has been uploaded to the repository: https://github.com/jiujieti/CypherImplementation More details about this project could be found there. If anyone is interested in this work or wants to continue this project or has some questions, you can always contact me by my email melody2014ymq@gmail. Best regards, Mengqi
Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2
Yes, I'm also +1 for removing the methods at some point. For 1.2 we could go ahead and move the Hadoop-MR connectors into a separate module and mark the methods in ExecutionEnvironment as @deprecated. In 1.3 (or 2.0 whatever comes next) we could make the switch. 2016-10-14 10:40 GMT+02:00 Stephan Ewen: > @Fabian Good point. For Flink 2.0, I would suggest to remove them from the > Environment and add them to a Utility. The way it is now, it ties Flink > very strongly to Hadoop. > > You are right, before we do that, there is no way to make a Hadoop > independent distribution. > > On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske wrote: > > > +1 for dropping Hadoop1 support. > > > > Regarding a binary release without Hadoop: > > > > What would we do about the readHadoopFile() and createHadoopInput() on > the > > ExecutionEnvironment? > > These methods are declared as @PublicEvolving, so we did not commit to > keep > > them. > > However that does not necessarily mean we should easily break the API > here > > esp. since the methods have not been declared @deprecated. > > > > Best, Fabian > > > > > > > > 2016-10-14 10:29 GMT+02:00 Stephan Ewen : > > > > > @Greg > > > > > > I think that would be amazing. It does require a bit of cleanup, > though. > > As > > > far as I know, the Hadoop dependency is additionally used for some > > Kerberos > > > utilities and for its S3 file system implementation. > > > We would need to make the Kerberos part Hadoop independent and the > > > FileSystem loading dynamic (with a good exception that the Hadoop > > > dependency should be added if the filesystem cannot be loaded). > > > > > > Stephan > > > > > > > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan > wrote: > > > > > > > Okay, this sounds prudent. Would this be the right time to implement > > > > FLINK-2268 "Provide Flink binary release without Hadoop"? > > > > > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen > > wrote: > > > > > > > > > +1 for dropping Hadoop1 support > > > > > > > > > > @greg There is quite some complexity in the build setup and release > > > > scripts > > > > > and testing to support Hadoop 1. Also, we have to prepare to add > > > support > > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very > > > tough. > > > > > > > > > > Stephan > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan > > > wrote: > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > What are the benefits to Flink for dropping Hadoop 1 support? Is > > > there > > > > > > significant code cleanup or would we simply be publishing one > less > > > set > > > > of > > > > > > artifacts? > > > > > > > > > > > > Greg > > > > > > > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger < > > > rmetz...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > The Apache Hadoop community has recently released the first > alpha > > > > > version > > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I > think > > > its > > > > > > time > > > > > > > to finally drop Hadoop 1 support in Flink. > > > > > > > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014. > > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in > > > July > > > > > > 2016. > > > > > > > Hadoop 2.2 was first released in October 2013, so there was > > enough > > > > time > > > > > > > for users to upgrade. > > > > > > > > > > > > > > I added also the user@ list to the discussion to get opinions > > > about > > > > > this > > > > > > > from there as well. > > > > > > > > > > > > > > Let me know what you think about this! > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > Robert > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi everyone, I think syntax in general is a question of taste, it will be hard to make everyone happy. On the one hand it would be great if Table API and SQL could look consistent, but on the other hand there are also some negative aspects: SQL is a language that has not been developed for todays needs and Stream SQL will basically be a "hack" e.g. by using UDFs like TUMBLE, HOP etc. However, the Table API is a newly designed API and does not need the same hacky solutions. The Table API should be a fluent API for both Scala and Java. If we are moving windows into the groupBy() call, the question is how this would look like: .groupBy('col, tumble(12.hours, 'rowtime, 'alias)) OR .groupBy('col, Tumble over 12.hours on 'rowtime as 'alias) In Java the window definitions would then be defined a string instead of method calls, so it is easier to for the user to make mistakes and there is no Javadoc with explanation. I think we should decide whether a window is an operator or an expression. If it is an expression we can also allow window definition in .over() clauses. What do you think? I support the idea of introducing partitionBy(). Regards, Timo Am 13/10/16 um 13:04 schrieb Zhangrucong: Hi Fabian: What is the strategy for new syntax which calcite does not support? The calcite will support it? For example, the row window syntax. Thank you very much! -邮件原件- 发件人: Fabian Hueske [mailto:fhue...@gmail.com] 发送时间: 2016年10月13日 18:17 收件人: dev@flink.apache.org 抄送: Sean Wang; Timo Walther 主题: Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations Hi Zhangrucong, yes, we want to use Calcite's SQL parser including its window syntax, i.e., - the standard SQL OVER windows (in streaming with a few restriction such as no different partitionings or orders) - the GroupBy window functions (TUMBLE, HOP, SESSION). The GroupBy window function are not implemented in Calcite yet. There is CALCITE-1345 [1] to track the issue. As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant. Best, Fabian [1] https://issues.apache.org/jira/browse/CALCITE-1345 2016-10-13 12:05 GMT+02:00 Fabian Hueske: Hi everybody, happy to see a good discussion here :-) I'll reply to Shaoxuan's mail first and comment on Zhangrucong question in a separate mail. Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we should definitely follow the standard (batch) SQL syntax. In my opinion, the Table API does not necessarily have to be as close as possible to SQL but should try to make a few things easier and also safer (easier is of course subjective). - GroupBy without windows: These are currently intentionally not supported and also not part of FLIP-11. Our motivation for not supporting this, is to guard the user from defining a query that fails when being executed due to a very memory consuming operation. FLIP-11 provides a way to define such a query as a sliding row window with unbounded preceding rows. With the upcoming SQL proposal, queries that consume unbounded memory should be identified and rejected. I would be in favor of allowing groupBy without windows once the guarding mechanism are in place. - GroupBy with window: I think this is a question of taste. Having a window() call, makes the feature more explicit in my opinion. However, I'm not opposed to move the windows into the groupBy clause. Implementation-wise it should be easy to move the window definition into to groupBy clause for the Scala Table API. For the Java Table API we would need to extend the parser quite a bit because windows would need to be defined as Strings and not via objects. - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause (implemented by PostgreSQL and Calcite) which allows to have "reusable" window definitions. I think this is a desirable feature. In the FLIP-11 proposal the over() clause in select() refers to the predefined windows with aliases. In case only one window is defined, the over() clause is optional and the same (and only) window is applied to all aggregates. I think we can make the over() call mandatory to have the windowing more explicit. It should also be possible to extend the over clause to directly accept RowWindows instead of window aliases. I would not make this a priority at the moment, but a feature that could be later added, because rowWindow() and over() cover all cases. Similar as for GroupBy with windows, we would need to extend the parser for the Java Table API though. Finally, I have an own suggestion: In FLIP-11, groupBy() is used to define the partitioning of RowWindows. I think this should be changed to partitionBy() because groupBy() groups data and applies an aggregation to all rows of a group which is not happening here. In original SQL, the OVER clause features a PARTITION BY clause. We are moving this out of the window definition, i.e., OVER clause, to enforce the same partitioning for
[DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment
Hi everybody, I would like to propose to deprecate the utility methods to read data with Hadoop InputFormats from the (batch) ExecutionEnvironment. The motivation for deprecating these methods is reduce Flink's dependency on Hadoop but rather have Hadoop as an optional dependency for users that actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have Flink distribution that does not have a hard Hadoop dependency. One step for this is to remove the Hadoop dependency from flink-java (Flink's Java DataSet API) which is currently required due to the above utility methods (see FLINK-4315). We recently received a PR that addresses FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment. After some discussion, it was decided to defer the PR to Flink 2.0 because it breaks the API (these methods are delared @PublicEvolving). I propose to accept this PR for Flink 1.2, but instead of removing the methods deprecating them. This would help to migrate old code and prevent new usage of these methods. For a later Flink release (1.3 or 2.0) we could remove these methods and the Hadoop dependency on flink-java. What do others think? Best, Fabian
Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment
+1 On Fri, Oct 14, 2016 at 5:29 AM, Fabian Hueskewrote: > Hi everybody, > > I would like to propose to deprecate the utility methods to read data with > Hadoop InputFormats from the (batch) ExecutionEnvironment. > > The motivation for deprecating these methods is reduce Flink's dependency > on Hadoop but rather have Hadoop as an optional dependency for users that > actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have > Flink distribution that does not have a hard Hadoop dependency. > > One step for this is to remove the Hadoop dependency from flink-java > (Flink's Java DataSet API) which is currently required due to the above > utility methods (see FLINK-4315). We recently received a PR that addresses > FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment. > After some discussion, it was decided to defer the PR to Flink 2.0 because > it breaks the API (these methods are delared @PublicEvolving). > > I propose to accept this PR for Flink 1.2, but instead of removing the > methods deprecating them. > This would help to migrate old code and prevent new usage of these methods. > For a later Flink release (1.3 or 2.0) we could remove these methods and > the Hadoop dependency on flink-java. > > What do others think? > > Best, Fabian >
Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Thanks for your quick reply, Fabian. I have a few minor comments: - Agree that we should consider GroupBy without window after the new SQL proposal is settled down. - For Java API, we can keep window() call, and put window alias into Groupby clause. This can be also applied to rowwindow case. &-+1 to support replace groupby() by partitionby(). BTW, in the case of over, instead of partitionby, are we going to support orderby? If yes, I would suggest to define rowwindow as rowwindow(PartionByParaType, OrderBy ParaType, WindowParaType). So - moving windows into the groupBy() call : +1 - making over() for rowWindow() with a single window definition. - additionally allowing window definitions in over(): +1 yes for scala, but use alias for java API. - using partitionBy() instead of groupBy() for row windows?: +1, but better to consider orderby, it may be even better to move partitionBy() into rowwindow. Regards, Shaoxuan On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske wrote: > Hi everybody, > > happy to see a good discussion here :-) > I'll reply to Shaoxuan's mail first and comment on Zhangrucong question > in a separate mail. > > Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we > should definitely follow the standard (batch) SQL syntax. > In my opinion, the Table API does not necessarily have to be as close as > possible to SQL but should try to make a few things easier and also safer > (easier is of course subjective). > > - GroupBy without windows: These are currently intentionally not supported > and also not part of FLIP-11. Our motivation for not supporting this, is to > guard the user from defining a query that fails when being executed due to > a very memory consuming operation. FLIP-11 provides a way to define such a > query as a sliding row window with unbounded preceding rows. With the > upcoming SQL proposal, queries that consume unbounded memory should be > identified and rejected. I would be in favor of allowing groupBy without > windows once the guarding mechanism are in place. > > - GroupBy with window: I think this is a question of taste. Having a > window() call, makes the feature more explicit in my opinion. However, I'm > not opposed to move the windows into the groupBy clause. > Implementation-wise it should be easy to move the window definition into to > groupBy clause for the Scala Table API. For the Java Table API we would > need to extend the parser quite a bit because windows would need to be > defined as Strings and not via objects. > > - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause > (implemented by PostgreSQL and Calcite) which allows to have "reusable" > window definitions. I think this is a desirable feature. In the FLIP-11 > proposal the over() clause in select() refers to the predefined windows > with aliases. In case only one window is defined, the over() clause is > optional and the same (and only) window is applied to all aggregates. I > think we can make the over() call mandatory to have the windowing more > explicit. It should also be possible to extend the over clause to directly > accept RowWindows instead of window aliases. I would not make this a > priority at the moment, but a feature that could be later added, because > rowWindow() and over() cover all cases. Similar as for GroupBy with > windows, we would need to extend the parser for the Java Table API though. > > Finally, I have an own suggestion: > In FLIP-11, groupBy() is used to define the partitioning of RowWindows. I > think this should be changed to partitionBy() because groupBy() groups data > and applies an aggregation to all rows of a group which is not happening > here. In original SQL, the OVER clause features a PARTITION BY clause. We > are moving this out of the window definition, i.e., OVER clause, to enforce > the same partitioning for all windows (different partitionings would be a > challenge to execute in a parallel system). > > @Timo and all: What do you think about: > > - moving windows into the groupBy() call > - making over() for rowWindow() with a single window definition > - additionally allowing window definitions in over() > - using partitionBy() instead of groupBy() for row windows? > > Best, Fabian > > 2016-10-13 11:10 GMT+02:00 Zhangrucong : > >> Hi shaoxuan: >> >> I think, the streamsql must be excuted in table environment. So I call >> this table API ‘s StreamSQL. What do you call for this, stream Table API or >> streamsql? It is fu >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val tblEnv = TableEnvironment.getTableEnvironment(env) >> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo") >> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) >> .map(f=>(f, 1L, 1L)) >> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'") >> >> So in my opinion, the grammar which is marked
Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment
+1 On Fri, Oct 14, 2016 at 12:04 PM, Stephan Ewenwrote: > +1 > > On Fri, Oct 14, 2016 at 11:54 AM, Greg Hogan wrote: > > > +1 > > > > On Fri, Oct 14, 2016 at 5:29 AM, Fabian Hueske > wrote: > > > > > Hi everybody, > > > > > > I would like to propose to deprecate the utility methods to read data > > with > > > Hadoop InputFormats from the (batch) ExecutionEnvironment. > > > > > > The motivation for deprecating these methods is reduce Flink's > dependency > > > on Hadoop but rather have Hadoop as an optional dependency for users > that > > > actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to > have > > > Flink distribution that does not have a hard Hadoop dependency. > > > > > > One step for this is to remove the Hadoop dependency from flink-java > > > (Flink's Java DataSet API) which is currently required due to the above > > > utility methods (see FLINK-4315). We recently received a PR that > > addresses > > > FLINK-4315 and removes the Hadoop methods from the > ExecutionEnvironment. > > > After some discussion, it was decided to defer the PR to Flink 2.0 > > because > > > it breaks the API (these methods are delared @PublicEvolving). > > > > > > I propose to accept this PR for Flink 1.2, but instead of removing the > > > methods deprecating them. > > > This would help to migrate old code and prevent new usage of these > > methods. > > > For a later Flink release (1.3 or 2.0) we could remove these methods > and > > > the Hadoop dependency on flink-java. > > > > > > What do others think? > > > > > > Best, Fabian > > > > > >
Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2
@Greg I think that would be amazing. It does require a bit of cleanup, though. As far as I know, the Hadoop dependency is additionally used for some Kerberos utilities and for its S3 file system implementation. We would need to make the Kerberos part Hadoop independent and the FileSystem loading dynamic (with a good exception that the Hadoop dependency should be added if the filesystem cannot be loaded). Stephan On Thu, Oct 13, 2016 at 8:55 PM, Greg Hoganwrote: > Okay, this sounds prudent. Would this be the right time to implement > FLINK-2268 "Provide Flink binary release without Hadoop"? > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen wrote: > > > +1 for dropping Hadoop1 support > > > > @greg There is quite some complexity in the build setup and release > scripts > > and testing to support Hadoop 1. Also, we have to prepare to add support > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very tough. > > > > Stephan > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan wrote: > > > > > Hi Robert, > > > > > > What are the benefits to Flink for dropping Hadoop 1 support? Is there > > > significant code cleanup or would we simply be publishing one less set > of > > > artifacts? > > > > > > Greg > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger > > > wrote: > > > > > > > Hi, > > > > > > > > The Apache Hadoop community has recently released the first alpha > > version > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I think its > > > time > > > > to finally drop Hadoop 1 support in Flink. > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014. > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in July > > > 2016. > > > > Hadoop 2.2 was first released in October 2013, so there was enough > time > > > > for users to upgrade. > > > > > > > > I added also the user@ list to the discussion to get opinions about > > this > > > > from there as well. > > > > > > > > Let me know what you think about this! > > > > > > > > > > > > Regards, > > > > Robert > > > > > > > > > >
Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2
@Fabian - Someone started with that in https://issues.apache.org/jira/browse/FLINK-4315 That could be changed to not remove the methods from the ExecutionEnvironment. On Fri, Oct 14, 2016 at 10:45 AM, Fabian Hueskewrote: > Yes, I'm also +1 for removing the methods at some point. > > For 1.2 we could go ahead and move the Hadoop-MR connectors into a separate > module and mark the methods in ExecutionEnvironment as @deprecated. > In 1.3 (or 2.0 whatever comes next) we could make the switch. > > 2016-10-14 10:40 GMT+02:00 Stephan Ewen : > > > @Fabian Good point. For Flink 2.0, I would suggest to remove them from > the > > Environment and add them to a Utility. The way it is now, it ties Flink > > very strongly to Hadoop. > > > > You are right, before we do that, there is no way to make a Hadoop > > independent distribution. > > > > On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske > wrote: > > > > > +1 for dropping Hadoop1 support. > > > > > > Regarding a binary release without Hadoop: > > > > > > What would we do about the readHadoopFile() and createHadoopInput() on > > the > > > ExecutionEnvironment? > > > These methods are declared as @PublicEvolving, so we did not commit to > > keep > > > them. > > > However that does not necessarily mean we should easily break the API > > here > > > esp. since the methods have not been declared @deprecated. > > > > > > Best, Fabian > > > > > > > > > > > > 2016-10-14 10:29 GMT+02:00 Stephan Ewen : > > > > > > > @Greg > > > > > > > > I think that would be amazing. It does require a bit of cleanup, > > though. > > > As > > > > far as I know, the Hadoop dependency is additionally used for some > > > Kerberos > > > > utilities and for its S3 file system implementation. > > > > We would need to make the Kerberos part Hadoop independent and the > > > > FileSystem loading dynamic (with a good exception that the Hadoop > > > > dependency should be added if the filesystem cannot be loaded). > > > > > > > > Stephan > > > > > > > > > > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan > > wrote: > > > > > > > > > Okay, this sounds prudent. Would this be the right time to > implement > > > > > FLINK-2268 "Provide Flink binary release without Hadoop"? > > > > > > > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen > > > wrote: > > > > > > > > > > > +1 for dropping Hadoop1 support > > > > > > > > > > > > @greg There is quite some complexity in the build setup and > release > > > > > scripts > > > > > > and testing to support Hadoop 1. Also, we have to prepare to add > > > > support > > > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very > > > > tough. > > > > > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan > > > > wrote: > > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > What are the benefits to Flink for dropping Hadoop 1 support? > Is > > > > there > > > > > > > significant code cleanup or would we simply be publishing one > > less > > > > set > > > > > of > > > > > > > artifacts? > > > > > > > > > > > > > > Greg > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger < > > > > rmetz...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > The Apache Hadoop community has recently released the first > > alpha > > > > > > version > > > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I > > think > > > > its > > > > > > > time > > > > > > > > to finally drop Hadoop 1 support in Flink. > > > > > > > > > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014. > > > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release > in > > > > July > > > > > > > 2016. > > > > > > > > Hadoop 2.2 was first released in October 2013, so there was > > > enough > > > > > time > > > > > > > > for users to upgrade. > > > > > > > > > > > > > > > > I added also the user@ list to the discussion to get > opinions > > > > about > > > > > > this > > > > > > > > from there as well. > > > > > > > > > > > > > > > > Let me know what you think about this! > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > Robert > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
[jira] [Created] (FLINK-4829) Accumulators are not thread safe
Till Rohrmann created FLINK-4829: Summary: Accumulators are not thread safe Key: FLINK-4829 URL: https://issues.apache.org/jira/browse/FLINK-4829 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.2.0 Reporter: Till Rohrmann Fix For: 1.2.0 Flink's {{Accumulators}} are not thread safe. With the introduction of live accumulator snapshots which are sent to the {{JobManager}}, we've introduced a concurrent access to accumulators without properly guard them against concurrent modifications. So if an accumulator snapshot is taken for an accumulator which is at the same time modified, it can cause an {{ConcurrentModificationException}} as it was reported by an user: {code} WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to serialize accumulators for task. java.util.ConcurrentModificationException at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) at java.util.TreeMap.writeObject(TreeMap.java:2436) at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.HashMap.internalWriteEntries(HashMap.java:1785) at java.util.HashMap.writeObject(HashMap.java:1362) at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) at org.apache.flink.util.SerializedValue.(SerializedValue.java:52) at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2
@Fabian Good point. For Flink 2.0, I would suggest to remove them from the Environment and add them to a Utility. The way it is now, it ties Flink very strongly to Hadoop. You are right, before we do that, there is no way to make a Hadoop independent distribution. On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueskewrote: > +1 for dropping Hadoop1 support. > > Regarding a binary release without Hadoop: > > What would we do about the readHadoopFile() and createHadoopInput() on the > ExecutionEnvironment? > These methods are declared as @PublicEvolving, so we did not commit to keep > them. > However that does not necessarily mean we should easily break the API here > esp. since the methods have not been declared @deprecated. > > Best, Fabian > > > > 2016-10-14 10:29 GMT+02:00 Stephan Ewen : > > > @Greg > > > > I think that would be amazing. It does require a bit of cleanup, though. > As > > far as I know, the Hadoop dependency is additionally used for some > Kerberos > > utilities and for its S3 file system implementation. > > We would need to make the Kerberos part Hadoop independent and the > > FileSystem loading dynamic (with a good exception that the Hadoop > > dependency should be added if the filesystem cannot be loaded). > > > > Stephan > > > > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan wrote: > > > > > Okay, this sounds prudent. Would this be the right time to implement > > > FLINK-2268 "Provide Flink binary release without Hadoop"? > > > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen > wrote: > > > > > > > +1 for dropping Hadoop1 support > > > > > > > > @greg There is quite some complexity in the build setup and release > > > scripts > > > > and testing to support Hadoop 1. Also, we have to prepare to add > > support > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very > > tough. > > > > > > > > Stephan > > > > > > > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan > > wrote: > > > > > > > > > Hi Robert, > > > > > > > > > > What are the benefits to Flink for dropping Hadoop 1 support? Is > > there > > > > > significant code cleanup or would we simply be publishing one less > > set > > > of > > > > > artifacts? > > > > > > > > > > Greg > > > > > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger < > > rmetz...@apache.org> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > The Apache Hadoop community has recently released the first alpha > > > > version > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I think > > its > > > > > time > > > > > > to finally drop Hadoop 1 support in Flink. > > > > > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014. > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in > > July > > > > > 2016. > > > > > > Hadoop 2.2 was first released in October 2013, so there was > enough > > > time > > > > > > for users to upgrade. > > > > > > > > > > > > I added also the user@ list to the discussion to get opinions > > about > > > > this > > > > > > from there as well. > > > > > > > > > > > > Let me know what you think about this! > > > > > > > > > > > > > > > > > > Regards, > > > > > > Robert > > > > > > > > > > > > > > > > > > > > >
Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2
Thanks for the pointer. I'll start a separate discussion and push the PR forward if we come to an agreement. 2016-10-14 11:04 GMT+02:00 Stephan Ewen: > @Fabian - Someone started with that in > https://issues.apache.org/jira/browse/FLINK-4315 > That could be changed to not remove the methods from the > ExecutionEnvironment. > > On Fri, Oct 14, 2016 at 10:45 AM, Fabian Hueske wrote: > > > Yes, I'm also +1 for removing the methods at some point. > > > > For 1.2 we could go ahead and move the Hadoop-MR connectors into a > separate > > module and mark the methods in ExecutionEnvironment as @deprecated. > > In 1.3 (or 2.0 whatever comes next) we could make the switch. > > > > 2016-10-14 10:40 GMT+02:00 Stephan Ewen : > > > > > @Fabian Good point. For Flink 2.0, I would suggest to remove them from > > the > > > Environment and add them to a Utility. The way it is now, it ties Flink > > > very strongly to Hadoop. > > > > > > You are right, before we do that, there is no way to make a Hadoop > > > independent distribution. > > > > > > On Fri, Oct 14, 2016 at 10:37 AM, Fabian Hueske > > wrote: > > > > > > > +1 for dropping Hadoop1 support. > > > > > > > > Regarding a binary release without Hadoop: > > > > > > > > What would we do about the readHadoopFile() and createHadoopInput() > on > > > the > > > > ExecutionEnvironment? > > > > These methods are declared as @PublicEvolving, so we did not commit > to > > > keep > > > > them. > > > > However that does not necessarily mean we should easily break the API > > > here > > > > esp. since the methods have not been declared @deprecated. > > > > > > > > Best, Fabian > > > > > > > > > > > > > > > > 2016-10-14 10:29 GMT+02:00 Stephan Ewen : > > > > > > > > > @Greg > > > > > > > > > > I think that would be amazing. It does require a bit of cleanup, > > > though. > > > > As > > > > > far as I know, the Hadoop dependency is additionally used for some > > > > Kerberos > > > > > utilities and for its S3 file system implementation. > > > > > We would need to make the Kerberos part Hadoop independent and the > > > > > FileSystem loading dynamic (with a good exception that the Hadoop > > > > > dependency should be added if the filesystem cannot be loaded). > > > > > > > > > > Stephan > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan > > > wrote: > > > > > > > > > > > Okay, this sounds prudent. Would this be the right time to > > implement > > > > > > FLINK-2268 "Provide Flink binary release without Hadoop"? > > > > > > > > > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen > > > > > wrote: > > > > > > > > > > > > > +1 for dropping Hadoop1 support > > > > > > > > > > > > > > @greg There is quite some complexity in the build setup and > > release > > > > > > scripts > > > > > > > and testing to support Hadoop 1. Also, we have to prepare to > add > > > > > support > > > > > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems > very > > > > > tough. > > > > > > > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan < > c...@greghogan.com> > > > > > wrote: > > > > > > > > > > > > > > > Hi Robert, > > > > > > > > > > > > > > > > What are the benefits to Flink for dropping Hadoop 1 support? > > Is > > > > > there > > > > > > > > significant code cleanup or would we simply be publishing one > > > less > > > > > set > > > > > > of > > > > > > > > artifacts? > > > > > > > > > > > > > > > > Greg > > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger < > > > > > rmetz...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > The Apache Hadoop community has recently released the first > > > alpha > > > > > > > version > > > > > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I > > > think > > > > > its > > > > > > > > time > > > > > > > > > to finally drop Hadoop 1 support in Flink. > > > > > > > > > > > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014. > > > > > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 > release > > in > > > > > July > > > > > > > > 2016. > > > > > > > > > Hadoop 2.2 was first released in October 2013, so there was > > > > enough > > > > > > time > > > > > > > > > for users to upgrade. > > > > > > > > > > > > > > > > > > I added also the user@ list to the discussion to get > > opinions > > > > > about > > > > > > > this > > > > > > > > > from there as well. > > > > > > > > > > > > > > > > > > Let me know what you think about this! > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Robert > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [DISCUSS] Drop Hadoop 1 support with Flink 1.2
+1 for dropping Hadoop1 support. Regarding a binary release without Hadoop: What would we do about the readHadoopFile() and createHadoopInput() on the ExecutionEnvironment? These methods are declared as @PublicEvolving, so we did not commit to keep them. However that does not necessarily mean we should easily break the API here esp. since the methods have not been declared @deprecated. Best, Fabian 2016-10-14 10:29 GMT+02:00 Stephan Ewen: > @Greg > > I think that would be amazing. It does require a bit of cleanup, though. As > far as I know, the Hadoop dependency is additionally used for some Kerberos > utilities and for its S3 file system implementation. > We would need to make the Kerberos part Hadoop independent and the > FileSystem loading dynamic (with a good exception that the Hadoop > dependency should be added if the filesystem cannot be loaded). > > Stephan > > > On Thu, Oct 13, 2016 at 8:55 PM, Greg Hogan wrote: > > > Okay, this sounds prudent. Would this be the right time to implement > > FLINK-2268 "Provide Flink binary release without Hadoop"? > > > > On Thu, Oct 13, 2016 at 11:25 AM, Stephan Ewen wrote: > > > > > +1 for dropping Hadoop1 support > > > > > > @greg There is quite some complexity in the build setup and release > > scripts > > > and testing to support Hadoop 1. Also, we have to prepare to add > support > > > for Hadoop 3, and then supporting in addition Hadoop 1 seems very > tough. > > > > > > Stephan > > > > > > > > > On Thu, Oct 13, 2016 at 5:04 PM, Greg Hogan > wrote: > > > > > > > Hi Robert, > > > > > > > > What are the benefits to Flink for dropping Hadoop 1 support? Is > there > > > > significant code cleanup or would we simply be publishing one less > set > > of > > > > artifacts? > > > > > > > > Greg > > > > > > > > On Thu, Oct 13, 2016 at 10:47 AM, Robert Metzger < > rmetz...@apache.org> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > The Apache Hadoop community has recently released the first alpha > > > version > > > > > for Hadoop 3.0.0, while we are still supporting Hadoop 1. I think > its > > > > time > > > > > to finally drop Hadoop 1 support in Flink. > > > > > > > > > > The last minor Hadoop 1 release was in 27 June, 2014. > > > > > Apache Spark dropped Hadoop 1 support with their 2.0 release in > July > > > > 2016. > > > > > Hadoop 2.2 was first released in October 2013, so there was enough > > time > > > > > for users to upgrade. > > > > > > > > > > I added also the user@ list to the discussion to get opinions > about > > > this > > > > > from there as well. > > > > > > > > > > Let me know what you think about this! > > > > > > > > > > > > > > > Regards, > > > > > Robert > > > > > > > > > > > > > > >
[jira] [Created] (FLINK-4828) execute stream job asynchronously
David Anderson created FLINK-4828: - Summary: execute stream job asynchronously Key: FLINK-4828 URL: https://issues.apache.org/jira/browse/FLINK-4828 Project: Flink Issue Type: New Feature Components: DataStream API Reporter: David Anderson It is currently awkward to work with datastreams in a notebook (e.g. Zeppelin or Jupyter) because env.execute() never returns, and there is no way to control the job that has been started. It would be much better to have a variant like env.executeAsync() that executes the job in another thread, and returns right away. This method should return an object that can be used to cancel or stop the job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment
+1 On Fri, Oct 14, 2016 at 11:54 AM, Greg Hoganwrote: > +1 > > On Fri, Oct 14, 2016 at 5:29 AM, Fabian Hueske wrote: > > > Hi everybody, > > > > I would like to propose to deprecate the utility methods to read data > with > > Hadoop InputFormats from the (batch) ExecutionEnvironment. > > > > The motivation for deprecating these methods is reduce Flink's dependency > > on Hadoop but rather have Hadoop as an optional dependency for users that > > actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have > > Flink distribution that does not have a hard Hadoop dependency. > > > > One step for this is to remove the Hadoop dependency from flink-java > > (Flink's Java DataSet API) which is currently required due to the above > > utility methods (see FLINK-4315). We recently received a PR that > addresses > > FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment. > > After some discussion, it was decided to defer the PR to Flink 2.0 > because > > it breaks the API (these methods are delared @PublicEvolving). > > > > I propose to accept this PR for Flink 1.2, but instead of removing the > > methods deprecating them. > > This would help to migrate old code and prevent new usage of these > methods. > > For a later Flink release (1.3 or 2.0) we could remove these methods and > > the Hadoop dependency on flink-java. > > > > What do others think? > > > > Best, Fabian > > >
[jira] [Created] (FLINK-4831) IMplement a log4j metric reporter
Chesnay Schepler created FLINK-4831: --- Summary: IMplement a log4j metric reporter Key: FLINK-4831 URL: https://issues.apache.org/jira/browse/FLINK-4831 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.1.2 Reporter: Chesnay Schepler Fix For: 1.2.0 For debugging purpose it would be very useful to have a log4j metric reporter. If you don't want to setup a metric backend you currently have to rely on JMX, which a) works a bit differently than other reporters (for example it doesn't extend AbstractReporter) and b) makes it a bit tricky to analyze results as metrics are cleaned up once a job finishes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4830) Show latency statistics in web interface
Robert Metzger created FLINK-4830: - Summary: Show latency statistics in web interface Key: FLINK-4830 URL: https://issues.apache.org/jira/browse/FLINK-4830 Project: Flink Issue Type: Sub-task Components: Metrics Reporter: Robert Metzger Assignee: Robert Metzger With FLINK-3660, we added a metric that measures the latency of records flowing through the system. With this JIRA, I would like to expose the latency also in the web frontend. Therefore, we'll probably need to change the format the latency is reported in the metric. I think we should show the latencies in two different views in the interface: - A global end-to-end latency in the overview of running jobs. This number is probably pretty inaccurate, but should give users a first impression of the latency characteristics currently present in the job - A detailed latency drill-down view, that allows to see how much latency is added on average at each operator, for the sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4832) Count/Sum 0 elements
Timo Walther created FLINK-4832: --- Summary: Count/Sum 0 elements Key: FLINK-4832 URL: https://issues.apache.org/jira/browse/FLINK-4832 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Currently, the Table API is unable to count or sum up 0 elements. We should improve DataSet aggregations for this. Maybe by union the original DataSet with a dummy record or by using a MapPartition function. Coming up with a good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Exception from in-progress implementation of Python API bulk iterations
In this branch: https://github.com/zentol/flink/tree/new-iterations you can find a more fine-grained fix for chaining with iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695 On 13.10.2016 23:11, Chesnay Schepler wrote: The chaining code is definitely related, I also have a pretty clear idea how to fix it. The odd thing is that the Java API doesn't catch this type mismatch; the date types are known when the plan is generated. This kind of error shouldn't even happen. On 13.10.2016 21:15, Geoffrey Mon wrote: Thank you very much. Disabling chaining with the Python API allows my actual script to run properly. The division by zero must be an issue with the job that I posted on gist. Does that mean that the issue must be in the chaining part of the API? Chaining from the way I understand it is an important optimization that would be important for the performance comparison I wish to make in my project. Cheers, Geoffrey On Thu, Oct 13, 2016 at 9:11 AM Chesnay Scheplerwrote: A temporary work around appears to be disabling chaining, which you can do by commenting out L215 "self._find_chains()" in Environment.py. Note that you then run into a division by zero error, but i can't tell whether that is a problem of the job or not. On 13.10.2016 13:41, Chesnay Schepler wrote: Hey Geoffrey, I was able to reproduce the error and will look into it in more detail tomorrow. Regards, Chesnay On 12.10.2016 23:09, Geoffrey Mon wrote: Hello, Has anyone had a chance to look into this? I am currently working on the problem but I have minimal understanding of how the internal Flink Python API works; any expertise would be greatly appreciated. Thank you very much! Geoffrey On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon wrote: Hi Chesnay, Heh, I have discovered that if I do not restart Flink after running my original problematic script, then similar issues will manifest themselves in other otherwise working scripts. I haven't been able to completely narrow down the problem, but I promise this new script will have a ClassCastException that is completely reproducible. :) https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a Thanks, Geoffrey On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler wrote: Hello Geoffrey, this one works for me as well :D Regards, Chesnay On 28.09.2016 05:38, Geoffrey Mon wrote: Hello Chesnay, Thank you for your help. After receiving your message I recompiled my version of Flink completely, and both the NullPointerException listed in the TODO and the ClassCastException with the join operation went away. Previously, I had been only recompiling the modules of Flink that had been changed to save time using "mvn clean install -pl :module" and apparently that may have been causing some of my issues. Now, the problem is more clear: when a specific group reduce function in my research project plan file is used within an iteration, I get a ClassCastException exception: Caused by: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) at org.apache.flink.runtime.iterative.io .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) at org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) at org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) at java.lang.Thread.run(Thread.java:745) I'm not sure why this is causing an exception, and I would greatly appreciate any assistance. I've revised the barebones error-causing plan file to focus on this new error source: https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a The group reduce function in question seems to work just fine outside of iterations. I have organized the commits and pushed to a new branch to make it easier to test and hopefully review soon: https://github.com/GEOFBOT/flink/tree/new-iterations Cheers, Geoffrey On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler wrote: Hello