[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262007#comment-16262007 ] ASF GitHub Bot commented on FLINK-8090: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152475532 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- This is good enough đź‘Ť > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction, Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2> firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtim
[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152475532 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- This is good enough 👍 ---
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261951#comment-16261951 ] Xingcan Cui commented on FLINK-8118: Hi [~twalthr], thanks for raising this. I got two questions about this issue. # Shall we integrate the methods into the {{KafkaTableSource.Builder}} or directly into the {{KafkaTableSource}}? Personally, I prefer the later one since it will be more flexible, while that seems to break the design pattern to some extent. # Since the {{startupMode}} in {{FlinkKafkaConsumerBase}} is invisible from outer classes, do you have some suggestions on testing this? Thanks, Xingcan > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261907#comment-16261907 ] lincoln.lee commented on FLINK-6101: [~twalthr] This implementation support select original expressions and UDF in groupBy clause(make consistent with SQL). It seems a bit odd to support alias in groupBy clause. > GroupBy fields with arithmetic expression (include UDF) can not be selected > --- > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > (BTW, this syntax is invalid in RDBMS which will indicate the selected column > is invalid in the select list because it is not contained in either an > aggregate function or the GROUP BY clause in SQL Server.) > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will also cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. > and apply an UDF doesn’t work either > {code} >table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, > 'd.count, 'e.avg) > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, > TMP_1, TMP_2]. > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > One way to solve this is to add support alias in groupBy clause ( it seems a > bit odd against SQL though TableAPI has a different groupBy grammar), > and I prefer to support select original expressions and UDF in groupBy > clause(make consistent with SQL). > as thus: > {code} > // use expression > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count) > // use UDF > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, Mod('b,3)) > .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count) > {code}
 > After had a look into the code, found there was a problem in the groupBy > implementation, validation hadn't considered the expressions in groupBy > clause. it should be noted that a table has been actually changed after > groupBy operation ( a new Table) and the groupBy keys replace the original > field reference in essence. > > What do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261904#comment-16261904 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152459473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -134,9 +138,9 @@ void assignExclusiveSegments(List segments) { --- End diff -- agree > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152459473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -134,9 +138,9 @@ void assignExclusiveSegments(List segments) { --- End diff -- agree ---
[jira] [Commented] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261898#comment-16261898 ] Hai Zhou UTC+8 commented on FLINK-7716: --- [~gjy] Sorry, I have not had any free time lately to work on this ticket. this ticket and FLINK-7717 are freely assigned. you can assign yourself if you are interested in them. > Port JobManagerMetricsHandler to new REST endpoint > -- > > Key: FLINK-7716 > URL: https://issues.apache.org/jira/browse/FLINK-7716 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hai Zhou UTC+8 > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261892#comment-16261892 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152457595 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -240,15 +245,13 @@ void releaseAllResources() throws IOException { } } } - - synchronized (availableBuffers) { + synchronized (bufferQueue) { --- End diff -- ok > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152457595 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -240,15 +245,13 @@ void releaseAllResources() throws IOException { } } } - - synchronized (availableBuffers) { + synchronized (bufferQueue) { --- End diff -- ok ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261886#comment-16261886 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152456861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode unless +* the channel has been released. +* +* @return The available buffer. +*/ + @Nullable + public Buffer requestBuffer() { + synchronized (bufferQueue) { + // Take the floating buffer first if possible. + if (bufferQueue.getFloatingBufferSize() > 0) { --- End diff -- that is fine > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152456861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode unless +* the channel has been released. +* +* @return The available buffer. +*/ + @Nullable + public Buffer requestBuffer() { + synchronized (bufferQueue) { + // Take the floating buffer first if possible. + if (bufferQueue.getFloatingBufferSize() > 0) { --- End diff -- that is fine ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261871#comment-16261871 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152454790 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode unless +* the channel has been released. +* +* @return The available buffer. +*/ + @Nullable + public Buffer requestBuffer() { + synchronized (bufferQueue) { + // Take the floating buffer first if possible. + if (bufferQueue.getFloatingBufferSize() > 0) { + return bufferQueue.takeFloatingBuffer(); + } else { + return bufferQueue.takeExclusiveBuffer(); + } + } + } + + /** +* Receives the backlog from the producer's buffer response. If the number of available +* buffers is less than backlog + initialCredit, it will request floating buffers from the buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + void onSenderBacklog(int backlog) throws IOException { --- End diff -- yes > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152454790 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** +* Requests buffer from input channel directly for receiving network data. +* It should always return an available buffer in credit-based mode unless +* the channel has been released. +* +* @return The available buffer. +*/ + @Nullable + public Buffer requestBuffer() { + synchronized (bufferQueue) { + // Take the floating buffer first if possible. + if (bufferQueue.getFloatingBufferSize() > 0) { + return bufferQueue.takeFloatingBuffer(); + } else { + return bufferQueue.takeExclusiveBuffer(); + } + } + } + + /** +* Receives the backlog from the producer's buffer response. If the number of available +* buffers is less than backlog + initialCredit, it will request floating buffers from the buffer +* pool, and then notify unannounced credits to the producer. +* +* @param backlog The number of unsent buffers in the producer's sub partition. +*/ + void onSenderBacklog(int backlog) throws IOException { --- End diff -- yes ---
[GitHub] flink pull request #5047: Code refine of WordWithCount
GitHub user harborl opened a pull request: https://github.com/apache/flink/pull/5047 Code refine of WordWithCount Only because of the thread-safe and coding convention consideration, please check if it's OK with adding `final` decoration? *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/harborl/
[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261869#comment-16261869 ] ASF GitHub Bot commented on FLINK-6101: --- GitHub user lincoln-lil opened a pull request: https://github.com/apache/flink/pull/5046 [FLINK-6101] [table] Support select GroupBy fields with arithmetic ex… ## What is the purpose of the change Support select GroupBy fields with arithmetic expressions(include UDF) ## Brief change log - Using an internal alias for groupBy fields with arithmetic expressions(include UDF) in groupBy(). ## Verifying this change `AggregateITCase` verifies - select GroupBy fields with arithmetic expression/UDF ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lincoln-lil/flink FLINK-6101 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5046.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5046 commit d529b9e6e9ed0977fbd2f27f27e1af0521b9231d Author: lincoln-lil Date: 2017-11-22T01:53:54Z [FLINK-6101] [table] Support select GroupBy fields with arithmetic expressions(include UDF) > GroupBy fields with arithmetic expression (include UDF) can not be selected > --- > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > (BTW, this syntax is invalid in RDBMS which will indicate the selected column > is invalid in the select list because it is not contained in either an > aggregate function or the GROUP BY clause in SQL Server.) > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will also cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. > and apply an UDF doesn’t work either > {code} >table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, > 'd.count, 'e.avg) > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, > TMP_1, TMP_2]. > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > One way to solve this is to add support alias in groupBy clause ( it seems a > bit odd against SQL though TableAPI has a different groupBy grammar), > and I prefer to support select original expressions and UDF in groupBy > clause(make consistent with SQL). > as thus: > {code} > // use expression > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count) > // use UDF > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, Mod('b,3)) > .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count) > {co
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152454439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -82,17 +84,19 @@ /** The initial number of exclusive buffers assigned to this channel. */ private int initialCredit; - /** The current available buffers including both exclusive buffers and requested floating buffers. */ - private final ArrayDeque availableBuffers = new ArrayDeque<>(); + /** The available buffer queue wraps both exclusive and requested floating buffers. */ + private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue(); /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); /** The number of unsent buffers in the producer's sub partition. */ - private final AtomicInteger senderBacklog = new AtomicInteger(0); + @GuardedBy("bufferQueue") + private int senderBacklog; --- End diff -- agree with it ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261870#comment-16261870 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152454439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -82,17 +84,19 @@ /** The initial number of exclusive buffers assigned to this channel. */ private int initialCredit; - /** The current available buffers including both exclusive buffers and requested floating buffers. */ - private final ArrayDeque availableBuffers = new ArrayDeque<>(); + /** The available buffer queue wraps both exclusive and requested floating buffers. */ + private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue(); /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); /** The number of unsent buffers in the producer's sub partition. */ - private final AtomicInteger senderBacklog = new AtomicInteger(0); + @GuardedBy("bufferQueue") + private int senderBacklog; --- End diff -- agree with it > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5046: [FLINK-6101] [table] Support select GroupBy fields...
GitHub user lincoln-lil opened a pull request: https://github.com/apache/flink/pull/5046 [FLINK-6101] [table] Support select GroupBy fields with arithmetic ex… ## What is the purpose of the change Support select GroupBy fields with arithmetic expressions(include UDF) ## Brief change log - Using an internal alias for groupBy fields with arithmetic expressions(include UDF) in groupBy(). ## Verifying this change `AggregateITCase` verifies - select GroupBy fields with arithmetic expression/UDF ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lincoln-lil/flink FLINK-6101 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5046.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5046 commit d529b9e6e9ed0977fbd2f27f27e1af0521b9231d Author: lincoln-lil Date: 2017-11-22T01:53:54Z [FLINK-6101] [table] Support select GroupBy fields with arithmetic expressions(include UDF) ---
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261868#comment-16261868 ] ASF GitHub Bot commented on FLINK-8090: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152454242 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- Actually, the test is quite tricky here (internally, `getListState()` will fetch a `null` value instead of the `MapState` created before). It will not simulate the real runtime behavior, which erasures the return type for `DefaultKeyedStateStore.getPartitionedState()`, since the test mocks a `KeyedStateBackend`. The type will be checked in advance and that's why I need to catch-and-throw the `ClassCastException` in `getPartitionedState()`. However, I cannot find a better place for this test. Do you have some suggestions? > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction, Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2> firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.ja
[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152454242 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- Actually, the test is quite tricky here (internally, `getListState()` will fetch a `null` value instead of the `MapState` created before). It will not simulate the real runtime behavior, which erasures the return type for `DefaultKeyedStateStore.getPartitionedState()`, since the test mocks a `KeyedStateBackend`. The type will be checked in advance and that's why I need to catch-and-throw the `ClassCastException` in `getPartitionedState()`. However, I cannot find a better place for this test. Do you have some suggestions? ---
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261822#comment-16261822 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 Keep this PR open until https://github.com/apache/bahir-flink/pull/22 merged. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...
Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 Keep this PR open until https://github.com/apache/bahir-flink/pull/22 merged. ---
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
GitHub user ChrisChinchilla opened a pull request: https://github.com/apache/flink/pull/5045 [hotfix][docs] Review of concepts docs for grammar and clarity Spending some time doing a brief review of a few docs sections, just a start and I think more could be done. I didn't change formatting or line breaks as much as I would like to as they're quite messy and inconsistent, but I'll save that for another time. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ChrisChinchilla/flink concepts-review Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5045.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5045 commit c592ad103cce9379bc7dc7ac6edef40769262523 Author: Chris Ward Date: 2017-11-22T01:10:08Z Review programming model doc for clarity commit 6310a879ba9d57ce4cfef5eeb81edc1978fb26a7 Author: Chris Ward Date: 2017-11-22T01:22:19Z Review run time docs ---
[jira] [Created] (FLINK-8127) Add New Relic Metric Reporter
Ron Crocker created FLINK-8127: -- Summary: Add New Relic Metric Reporter Key: FLINK-8127 URL: https://issues.apache.org/jira/browse/FLINK-8127 Project: Flink Issue Type: Improvement Components: Metrics Reporter: Ron Crocker Add a MetricReporter that reports to New Relic. This will likely look similar to the Datadog metric reporter - an opt-in library distributed with Flink that communicates directly with New Relic like one of its APM agents, configured appropriately to work with New Relic. I'll take this ticket myself -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5023: [hotfix][docs] Review of concepts docs for grammar, forma...
Github user ChrisChinchilla commented on the issue: https://github.com/apache/flink/pull/5023 @greghogan Will start this PR afresh. ---
[GitHub] flink pull request #5023: [hotfix][docs] Review of concepts docs for grammar...
Github user ChrisChinchilla closed the pull request at: https://github.com/apache/flink/pull/5023 ---
[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...
Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/5024#discussion_r152427259 --- Diff: docs/index.md --- @@ -23,19 +24,17 @@ specific language governing permissions and limitations under the License. --> - - This documentation is for Apache Flink version {{ site.version_title }}. These pages were built at: {% build_time %}. Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. ## First Steps -- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first. +- **Concepts**: We recommend you start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. -- **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html). +- **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html). -- **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. +- **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html), the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. --- End diff -- @greghogan No, this was an attempt to make the whole passage flow better as it says… > xxx and yyy or zzz So I was attempting to seperate out the two parts of the paragraph, better may be… > **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html), the [DataStream API](dev/datastream_api.html), and the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. Then it's an oxford comma, but that's stylistic. ---
[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...
Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/5024#discussion_r152426833 --- Diff: docs/index.md --- @@ -23,19 +24,17 @@ specific language governing permissions and limitations under the License. --> - - This documentation is for Apache Flink version {{ site.version_title }}. These pages were built at: {% build_time %}. Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. ## First Steps -- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first. +- **Concepts**: We recommend you start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. --- End diff -- @greghogan It's something my Markdown linter 'fixes', it doesn't matter too much, but is apparently more correct. ---
[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...
Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/5024#discussion_r152426709 --- Diff: docs/README.md --- @@ -90,7 +90,7 @@ This will be replaced with the value of the variable called `NAME` when generati Headings -All documents are structured with headings. From these headings, you can automatically generate a page table of contents (see below). +All documents are structured with headings, written in "Title Case". From these headings, you can automatically generate a page table of contents (see below). --- End diff -- @greghogan Not sure, I have found a lot of inconsistent headings in the docs, so it seemed worthwhile adding, doesn't mean anyone will follow it of course :) ---
[jira] [Commented] (FLINK-4877) Refactorings around FLINK-3674 (User Function Timers)
[ https://issues.apache.org/jira/browse/FLINK-4877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261566#comment-16261566 ] ASF GitHub Bot commented on FLINK-4877: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4877 @vim-wj if you are okay with Stephan's suggestion could you close this pull request? Also, a small note: `FLINK-4877` references a [Jira ticket](https://issues.apache.org/jira/browse/FLINK-4877) rather than a pull request (use `[hotfix]` for simple issues not requiring a ticket). > Refactorings around FLINK-3674 (User Function Timers) > - > > Key: FLINK-4877 > URL: https://issues.apache.org/jira/browse/FLINK-4877 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261567#comment-16261567 ] ASF GitHub Bot commented on FLINK-8126: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5044 +1 > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4877: [FLINK-4877] About SourceFunction extends Serializable
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4877 @vim-wj if you are okay with Stephan's suggestion could you close this pull request? Also, a small note: `FLINK-4877` references a [Jira ticket](https://issues.apache.org/jira/browse/FLINK-4877) rather than a pull request (use `[hotfix]` for simple issues not requiring a ticket). ---
[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5044 +1 ---
[jira] [Commented] (FLINK-7316) always use off-heap network buffers
[ https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261560#comment-16261560 ] ASF GitHub Bot commented on FLINK-7316: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4481 rebased again - this should be good to go. @StefanRRichter can you continue with this? > always use off-heap network buffers > --- > > Key: FLINK-7316 > URL: https://issues.apache.org/jira/browse/FLINK-7316 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > In order to send flink buffers through netty into the network, we need to > make the buffers use off-heap memory. Otherwise, there will be a hidden copy > happening in the NIO stack. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4481: [FLINK-7316][network] always use off-heap network buffers
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4481 rebased again - this should be good to go. @StefanRRichter can you continue with this? ---
[jira] [Commented] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261538#comment-16261538 ] ASF GitHub Bot commented on FLINK-8126: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5044 [FLINK-8126] [build] Fix and update checkstyle ## What is the purpose of the change Update to the latest checkstyle version and fix the errors not previously detected. ## Brief change log - update checkstyle to version 8.4 from version 6.19 - in `checkstyle.xml` move `SuppressionCommentFilter` under `TreeWalker` and remove `FileContentsHolder` (see checkstyle/checkstyle pr4714) - correct latent checkstyle errors ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8126_update_and_fix_checkstyle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5044 commit 3a01732d4e823f732f5fa4b63054b4b8f9e40f3e Author: Greg Hogan Date: 2017-11-21T19:05:53Z [FLINK-8126] [build] Fix and update checkstyle Update to the latest checkstyle version and fix the errors not previously detected. > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5044: [FLINK-8126] [build] Fix and update checkstyle
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5044 [FLINK-8126] [build] Fix and update checkstyle ## What is the purpose of the change Update to the latest checkstyle version and fix the errors not previously detected. ## Brief change log - update checkstyle to version 8.4 from version 6.19 - in `checkstyle.xml` move `SuppressionCommentFilter` under `TreeWalker` and remove `FileContentsHolder` (see checkstyle/checkstyle pr4714) - correct latent checkstyle errors ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8126_update_and_fix_checkstyle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5044 commit 3a01732d4e823f732f5fa4b63054b4b8f9e40f3e Author: Greg Hogan Date: 2017-11-21T19:05:53Z [FLINK-8126] [build] Fix and update checkstyle Update to the latest checkstyle version and fix the errors not previously detected. ---
[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5006#discussion_r152406271 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -162,14 +161,19 @@ So far, you have set up your cluster to run with queryable state and you have de queryable. Now it is time to see how to query this state. For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` -jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below: +jar which you have to explicitly include as a dependency in the `pom.xml` of your project along with `flink-core`, as shown below: {% highlight xml %} org.apache.flink - flink-queryable-state-client-java_{{ site.scala_version_suffix }} - {{site.version }} + flink-core + {{ site.version }} + + + org.apache.flink + flink-queryable-state-client-java{{ site.scala_version_suffix }} --- End diff -- no, the underscore is included in `site.scala_version_suffix`. ---
[GitHub] flink pull request #4946: [FLINK-7967] [config] Deprecate Hadoop specific Fl...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/4946#discussion_r152405696 --- Diff: flink-dist/src/main/resources/flink-conf.yaml --- @@ -151,6 +151,9 @@ jobmanager.web.port: 8081 # Path to the Hadoop configuration directory. # +# Warning: these keys are deprecated and will be removed in 1.5. Instead, use --- End diff -- I see your question on a specific future version for removing these keys was not answered but I expect that it won't be for 1.5 if ever. I think we can leave this at the deprecation notice and recommendation to use `HADOOP_CONF_DIR` without stating a time for removal. ---
[jira] [Commented] (FLINK-7967) Deprecate Hadoop specific Flink configuration options
[ https://issues.apache.org/jira/browse/FLINK-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261507#comment-16261507 ] ASF GitHub Bot commented on FLINK-7967: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/4946#discussion_r152405696 --- Diff: flink-dist/src/main/resources/flink-conf.yaml --- @@ -151,6 +151,9 @@ jobmanager.web.port: 8081 # Path to the Hadoop configuration directory. # +# Warning: these keys are deprecated and will be removed in 1.5. Instead, use --- End diff -- I see your question on a specific future version for removing these keys was not answered but I expect that it won't be for 1.5 if ever. I think we can leave this at the deprecation notice and recommendation to use `HADOOP_CONF_DIR` without stating a time for removal. > Deprecate Hadoop specific Flink configuration options > - > > Key: FLINK-7967 > URL: https://issues.apache.org/jira/browse/FLINK-7967 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Till Rohrmann >Priority: Trivial > > I think we should deprecate the hadoop specific configuration options from > Flink and encourage people to use instead the environment variable > {{HADOOP_CONF_DIR}} to configure the Hadoop configuration directory. This > includes: > {code} > fs.hdfs.hdfsdefault > fs.hdfs.hdfssite > fs.hdfs.hadoopconf > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string
[ https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261503#comment-16261503 ] ASF GitHub Bot commented on FLINK-8070: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5012 Also, this particular exception stops at after `Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch target 152` because the next lines don't look like a stack trace. Here's a snippet: ``` at org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196) at org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123) Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch target 152 Exception Details: Location: akka/dispatch/Mailbox.processAllSystemMessages()V @152: getstatic Reason: ``` > YarnTestBase should print prohibited string > --- > > Key: FLINK-8070 > URL: https://issues.apache.org/jira/browse/FLINK-8070 > Project: Flink > Issue Type: Improvement > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > The yarn tests check the log files for a set of prohibited strings. If found, > the entire log file is logged as WARN, the offending line is logged as ERROR, > and the test fails with this unhelpful message: > {code} > java.lang.AssertionError(Found a file > /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1510164935122_0002/container_1510164935122_0002_01_01/jobmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081]) > {code} > If you don't have log access on travis you have thus no knowledge what > actually went wrong. > I propose to also print smaller excerpts around the found error (like 10 > lines or smth) in the Assert.fail message. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5012 Also, this particular exception stops at after `Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch target 152` because the next lines don't look like a stack trace. Here's a snippet: ``` at org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196) at org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123) Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch target 152 Exception Details: Location: akka/dispatch/Mailbox.processAllSystemMessages()V @152: getstatic Reason: ``` ---
[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5012 In this particular stack trace, you have the stack trace of the assertion error ``` java.lang.AssertionError(... exception message ...) at org.junit.runners.model.MultipleFailureException.assertEmpty(MultipleFailureException.java:67) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:39) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) ``` and the excerpts in the exception message of the AssertionError, as a list of exceptions ``` Found a file /home/Zento/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1510667711263_0001/container_1510667711263_0001_01_01/jobmanager.log with a prohibited string (one of [Exception, Started SelectChannelConnector@0.0.0.0:8081]). Excerpts: [ java.lang.Exception: Could not create actor system at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:171) at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:115) at org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:313) at org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:199) at org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:196) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196) at org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123) Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch target 152 ] ``` The formatting isn't really ideal, but i don't know of an easy way to change it. (And it's still better than nothing) ---
[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string
[ https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261494#comment-16261494 ] ASF GitHub Bot commented on FLINK-8070: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5012 In this particular stack trace, you have the stack trace of the assertion error ``` java.lang.AssertionError(... exception message ...) at org.junit.runners.model.MultipleFailureException.assertEmpty(MultipleFailureException.java:67) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:39) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) ``` and the excerpts in the exception message of the AssertionError, as a list of exceptions ``` Found a file /home/Zento/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1510667711263_0001/container_1510667711263_0001_01_01/jobmanager.log with a prohibited string (one of [Exception, Started SelectChannelConnector@0.0.0.0:8081]). Excerpts: [ java.lang.Exception: Could not create actor system at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:171) at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:115) at org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:313) at org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:199) at org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:196) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196) at org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123) Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch target 152 ] ``` The formatting isn't really ideal, but i don't know of an easy way to change it. (And it's still better than nothing) > YarnTestBase should print prohibited string > --- > > Key: FLINK-8070 > URL: https://issues.apache.org/jira/browse/FLINK-8070 > Project: Flink > Issue Type: Improvement > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > The yarn tests check the log files for a se
[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5006#discussion_r152402516 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -162,14 +161,19 @@ So far, you have set up your cluster to run with queryable state and you have de queryable. Now it is time to see how to query this state. For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` -jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below: +jar which you have to explicitly include as a dependency in the `pom.xml` of your project along with `flink-core`, as shown below: --- End diff -- "you have to explicitly include" -> "must be explicitly included"? ---
[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5006#discussion_r152402801 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -162,14 +161,19 @@ So far, you have set up your cluster to run with queryable state and you have de queryable. Now it is time to see how to query this state. For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` -jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below: +jar which you have to explicitly include as a dependency in the `pom.xml` of your project along with `flink-core`, as shown below: {% highlight xml %} org.apache.flink - flink-queryable-state-client-java_{{ site.scala_version_suffix }} - {{site.version }} + flink-core + {{ site.version }} + + + org.apache.flink + flink-queryable-state-client-java{{ site.scala_version_suffix }} --- End diff -- Do we need to preserve the underscore after `java`? ---
[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5006#discussion_r152402097 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -60,7 +60,7 @@ The Queryable State feature consists of three main entities: returning it to the client, and 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. -In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +The client will connect to one of the proxies and send a request for the state associated with a specific --- End diff -- "will connect" -> "connects" (and "send" -> "sends")? ---
[jira] [Closed] (FLINK-8123) Bundle python library in jar
[ https://issues.apache.org/jira/browse/FLINK-8123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8123. --- Resolution: Fixed 1.5: c4107d4c336ed8dbadc03a7018eb255f4df3d1cc > Bundle python library in jar > > > Key: FLINK-8123 > URL: https://issues.apache.org/jira/browse/FLINK-8123 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.5.0 > > > Currently, the flink-python library is split into 2 parts in flink-dist; the > flink-python jar in the /lib directory, and the python scripts in the > /resources directory. > I propose to bundle the python scripts in the flink-python jar. This way, the > jar is self-contained and we no longer need to search for the python scripts > (which was hacky and had a separate codepath for tests). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261489#comment-16261489 ] ASF GitHub Bot commented on FLINK-2170: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/5043 [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. ## What is the purpose of the change * Adds `OrcRowInputFormat` to read [ORC files](https://orc.apache.org) as `DataSet`. The input format supports projection and filter push-down. * Adds `OrcTableSource` to read [ORC files](https://orc.apache.org) as a `Table` in a batch Table API or SQL query. The table source supports projection and filter push-down. ## Brief change log * Creates a new module `flink-connectors/flink-orc` * Add `OrcRowInputFormat` * Add `OrcTableSource` * Add tests for input format and table source * Adjust cost model of batch table scans to favor table sources with pushed-down filters over those without pushed-down filters. * Add static method to `RowTypeInfo` to project on fields. * Improve translation of literals in `RexProgramExtractor` ## Verifying this change * `OrcRowInputFormatTest` verifies * Correct configuration of ORC readers. * Results when reading ORC files * Schema evolution support * Computation of split boundaries * `OrcTableSourceTest` verifies * Correct implementation of TableSource interface methods * Correct configuration of `OrcRowInputFormat` for test queries (predicate and filter push-down) * `OrcTableSourceITCase` runs end-to-end tests with SQL queries. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **yes**, adds a new Maven module `flink-orc` with a dependency on `org.apache.orc/orc-core` - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **yes**, documentation for `RowTableSource` was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink table-ORC Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5043 commit 2f524dfa0c4f8468691151925a622ba7fee55f0f Author: uybhatti Date: 2017-03-03T22:55:22Z [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. commit d80506e3785268f541457a69ade3118c634cf7e7 Author: Fabian Hueske Date: 2017-11-13T13:54:54Z [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/5043 [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. ## What is the purpose of the change * Adds `OrcRowInputFormat` to read [ORC files](https://orc.apache.org) as `DataSet`. The input format supports projection and filter push-down. * Adds `OrcTableSource` to read [ORC files](https://orc.apache.org) as a `Table` in a batch Table API or SQL query. The table source supports projection and filter push-down. ## Brief change log * Creates a new module `flink-connectors/flink-orc` * Add `OrcRowInputFormat` * Add `OrcTableSource` * Add tests for input format and table source * Adjust cost model of batch table scans to favor table sources with pushed-down filters over those without pushed-down filters. * Add static method to `RowTypeInfo` to project on fields. * Improve translation of literals in `RexProgramExtractor` ## Verifying this change * `OrcRowInputFormatTest` verifies * Correct configuration of ORC readers. * Results when reading ORC files * Schema evolution support * Computation of split boundaries * `OrcTableSourceTest` verifies * Correct implementation of TableSource interface methods * Correct configuration of `OrcRowInputFormat` for test queries (predicate and filter push-down) * `OrcTableSourceITCase` runs end-to-end tests with SQL queries. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **yes**, adds a new Maven module `flink-orc` with a dependency on `org.apache.orc/orc-core` - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **yes**, documentation for `RowTableSource` was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink table-ORC Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5043 commit 2f524dfa0c4f8468691151925a622ba7fee55f0f Author: uybhatti Date: 2017-03-03T22:55:22Z [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. commit d80506e3785268f541457a69ade3118c634cf7e7 Author: Fabian Hueske Date: 2017-11-13T13:54:54Z [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. ---
[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5012 LGTM. I'm puzzled why the `])` is printed out-of-order in the middle of the stack trace. ---
[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string
[ https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261448#comment-16261448 ] ASF GitHub Bot commented on FLINK-8070: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5012 LGTM. I'm puzzled why the `])` is printed out-of-order in the middle of the stack trace. > YarnTestBase should print prohibited string > --- > > Key: FLINK-8070 > URL: https://issues.apache.org/jira/browse/FLINK-8070 > Project: Flink > Issue Type: Improvement > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > The yarn tests check the log files for a set of prohibited strings. If found, > the entire log file is logged as WARN, the offending line is logged as ERROR, > and the test fails with this unhelpful message: > {code} > java.lang.AssertionError(Found a file > /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1510164935122_0002/container_1510164935122_0002_01_01/jobmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081]) > {code} > If you don't have log access on travis you have thus no knowledge what > actually went wrong. > I propose to also print smaller excerpts around the found error (like 10 > lines or smth) in the Assert.fail message. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8084) Remove japicmp deactivations in several modules
[ https://issues.apache.org/jira/browse/FLINK-8084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261430#comment-16261430 ] ASF GitHub Bot commented on FLINK-8084: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5020 +1 > Remove japicmp deactivations in several modules > --- > > Key: FLINK-8084 > URL: https://issues.apache.org/jira/browse/FLINK-8084 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > The japicmp module is explicitly deactivated in the following modules: > * java8 > * quickstart > * yarn-tests > Since the module has to be explicitly enabled these entries can be removed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5020: [FLINK-8084][build] Remove unnecessary japicmp pom entrie...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5020 +1 ---
[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5024#discussion_r152384448 --- Diff: docs/index.md --- @@ -23,19 +24,17 @@ specific language governing permissions and limitations under the License. --> - - This documentation is for Apache Flink version {{ site.version_title }}. These pages were built at: {% build_time %}. Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. ## First Steps -- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first. +- **Concepts**: We recommend you start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. --- End diff -- Why the extra spacing? ---
[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5024#discussion_r152386126 --- Diff: docs/index.md --- @@ -23,19 +24,17 @@ specific language governing permissions and limitations under the License. --> - - This documentation is for Apache Flink version {{ site.version_title }}. These pages were built at: {% build_time %}. Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. ## First Steps -- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first. +- **Concepts**: We recommend you start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. -- **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html). +- **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html). -- **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. +- **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html), the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. --- End diff -- Oxford comma? ---
[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5024#discussion_r152384103 --- Diff: docs/README.md --- @@ -90,7 +90,7 @@ This will be replaced with the value of the variable called `NAME` when generati Headings -All documents are structured with headings. From these headings, you can automatically generate a page table of contents (see below). +All documents are structured with headings, written in "Title Case". From these headings, you can automatically generate a page table of contents (see below). --- End diff -- It seems that this could be left as an implicit assumption for "headings". ---
[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink
[ https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261371#comment-16261371 ] Greg Hogan commented on FLINK-8036: --- Are many developers running tests from the command line or using an IDE for incremental compilation and selectively running tests? > Consider using gradle to build Flink > > > Key: FLINK-8036 > URL: https://issues.apache.org/jira/browse/FLINK-8036 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > Here is summary from Lukasz over this thread > (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool) > w.r.t. performance boost from using gradle: > Maven performs parallelization at the module level, an entire module needs > to complete before any dependent modules can start, this means running all > the checks like findbugs, checkstyle, tests need to finish. Gradle has task > level parallelism between subprojects which means that as soon as the > compile and shade steps are done for a project, and dependent subprojects > can typically start. This means that we get increased parallelism due to > not needing to wait for findbugs, checkstyle, tests to run. I typically see > ~20 tasks (at peak) running on my desktop in parallel. > Flink should consider using gradle - on Linux with SSD, a clean build takes > an hour. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8113) Bump maven-shade-plugin to 3.0.0
[ https://issues.apache.org/jira/browse/FLINK-8113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261335#comment-16261335 ] ASF GitHub Bot commented on FLINK-8113: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5042 I looked through the open issues for 3.0.0, and the ones that were fixed for 3.1.0 and i couldn't find anything critical. > Bump maven-shade-plugin to 3.0.0 > > > Key: FLINK-8113 > URL: https://issues.apache.org/jira/browse/FLINK-8113 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0 > > > We should investigate whether we can bump the shade plugin to 3.0.0. Earlier > versions do not properly relocate services, forcing some modules to set a > different plugin version in their own configuration (flink-s3-fs-presto for > example, or flink-dist after FLINK-8111). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5042: [FLINK-8113][build] Bump maven-shade-plugin to 3.0.0
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5042 I looked through the open issues for 3.0.0, and the ones that were fixed for 3.1.0 and i couldn't find anything critical. ---
[jira] [Updated] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-8126: -- Component/s: Build System > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8126) Update and fix checkstyle
Greg Hogan created FLINK-8126: - Summary: Update and fix checkstyle Key: FLINK-8126 URL: https://issues.apache.org/jira/browse/FLINK-8126 Project: Flink Issue Type: Bug Affects Versions: 1.5.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.5.0 Our current checkstyle configuration (checkstyle version 6.19) is missing some ImportOrder and variable naming errors which are detected in 1) IntelliJ using the same checkstyle version and 2) with the maven-checkstyle-plugin with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6294) BucketingSink throws NPE while cancelling job
[ https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261228#comment-16261228 ] Cristian commented on FLINK-6294: - This is happening to me on Flink 1.3.2. The line is different, but I guess it's the same problem: {code:java} 2017-11-21 16:55:16,276 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. java.lang.NullPointerException at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) {code} > BucketingSink throws NPE while cancelling job > - > > Key: FLINK-6294 > URL: https://issues.apache.org/jira/browse/FLINK-6294 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > * configure BucketingSink and run job > * cancel job from UI before processing any messages > * in logs: > {code} > 2017-04-11 10:14:54,681 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Source: Custom > Source (1/2) [Source: Custom Source (1/2)] > 2017-04-11 10:14:54,881 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Un-registering task and sending final execution state CANCELED to JobManager > for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) > [flink-akka.actor.default-dispatcher-4] > 2017-04-11 10:14:56,584 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)] > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner
[ https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261209#comment-16261209 ] ASF GitHub Bot commented on FLINK-8117: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5041 Merging this... > Eliminate modulo operation from RoundRobinChannelSelector and > RebalancePartitioner > -- > > Key: FLINK-8117 > URL: https://issues.apache.org/jira/browse/FLINK-8117 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, Streaming >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > Fix For: 1.5.0 > > > {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and > {{RescalePartitioner}} use a modulo operation to wrap around when the current > channel counter reaches the number of channels. Using an {{if}} would have > better performance. > A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but > the {{if}} will be only 1-2 cycles on average, since the branch predictor can > most of the time predict the condition to be false. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5041: [FLINK-8117] [runtime] [streaming] Eliminate modulo opera...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5041 Merging this... ---
[jira] [Created] (FLINK-8125) Support limiting the number of open FileSystem connections
Stephan Ewen created FLINK-8125: --- Summary: Support limiting the number of open FileSystem connections Key: FLINK-8125 URL: https://issues.apache.org/jira/browse/FLINK-8125 Project: Flink Issue Type: Improvement Components: Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.1, 1.5.0 We need a way to limit the number of streams that Flink FileSystems concurrently open. For example, for very small HDFS clusters with few RPC handlers, a large Flink job trying to build up many connections during a checkpoint causes failures due to rejected connections. I propose to add a file system that can wrap another existing file system The file system may track the progress of streams and close streams that have been inactive for too long, to avoid locked streams of taking up the complete pool. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink
[ https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261193#comment-16261193 ] Ted Yu commented on FLINK-8036: --- Last note: when I see gradlew run selected test(s) so fast, I wish maven doesn't have to go over all the modules before coming to the module where the test resides. > Consider using gradle to build Flink > > > Key: FLINK-8036 > URL: https://issues.apache.org/jira/browse/FLINK-8036 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > Here is summary from Lukasz over this thread > (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool) > w.r.t. performance boost from using gradle: > Maven performs parallelization at the module level, an entire module needs > to complete before any dependent modules can start, this means running all > the checks like findbugs, checkstyle, tests need to finish. Gradle has task > level parallelism between subprojects which means that as soon as the > compile and shade steps are done for a project, and dependent subprojects > can typically start. This means that we get increased parallelism due to > not needing to wait for findbugs, checkstyle, tests to run. I typically see > ~20 tasks (at peak) running on my desktop in parallel. > Flink should consider using gradle - on Linux with SSD, a clean build takes > an hour. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner
[ https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261140#comment-16261140 ] ASF GitHub Bot commented on FLINK-8117: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5041 Very good change, and nice implementation! +1 to merge this. > Eliminate modulo operation from RoundRobinChannelSelector and > RebalancePartitioner > -- > > Key: FLINK-8117 > URL: https://issues.apache.org/jira/browse/FLINK-8117 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, Streaming >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > Fix For: 1.5.0 > > > {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and > {{RescalePartitioner}} use a modulo operation to wrap around when the current > channel counter reaches the number of channels. Using an {{if}} would have > better performance. > A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but > the {{if}} will be only 1-2 cycles on average, since the branch predictor can > most of the time predict the condition to be false. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5041: [FLINK-8117] [runtime] [streaming] Eliminate modulo opera...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5041 Very good change, and nice implementation! +1 to merge this. ---
[jira] [Commented] (FLINK-8113) Bump maven-shade-plugin to 3.0.0
[ https://issues.apache.org/jira/browse/FLINK-8113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261134#comment-16261134 ] ASF GitHub Bot commented on FLINK-8113: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5042 Looks good to me. Do we know if there are any new known issues in shade "3.0.0"? The first release of a new major version sometimes has a few regressions. Have you checked the Maven JIRA per chance? Otherwise +1 > Bump maven-shade-plugin to 3.0.0 > > > Key: FLINK-8113 > URL: https://issues.apache.org/jira/browse/FLINK-8113 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0 > > > We should investigate whether we can bump the shade plugin to 3.0.0. Earlier > versions do not properly relocate services, forcing some modules to set a > different plugin version in their own configuration (flink-s3-fs-presto for > example, or flink-dist after FLINK-8111). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5042: [FLINK-8113][build] Bump maven-shade-plugin to 3.0.0
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5042 Looks good to me. Do we know if there are any new known issues in shade "3.0.0"? The first release of a new major version sometimes has a few regressions. Have you checked the Maven JIRA per chance? Otherwise +1 ---
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261122#comment-16261122 ] ASF GitHub Bot commented on FLINK-8090: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152348905 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- LGTM! Just wondering which will throw the exception? `getMapState()` or `getListState()`? If the former, then we don't need the latter one here. > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction, Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2> firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(T
[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152348905 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- LGTM! Just wondering which will throw the exception? `getMapState()` or `getListState()`? If the former, then we don't need the latter one here. ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261077#comment-16261077 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152342531 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -108,11 +108,7 @@ int releaseMemory() throws IOException { for (int i = 0; i < numBuffers; i++) { Buffer buffer = buffers.remove(); spilledBytes += buffer.getSize(); - try { - spillWriter.writeBlock(buffer); - } finally { - buffer.recycle(); - } + spillWriter.writeBlock(buffer); --- End diff -- Actually, if I see this correctly, here the original code is wrong since it is already recycling a buffer which was added to an asynchronous file write operation. This would lead to data corruption if the buffer is re-used in the meanwhile, wouldn't it?! > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] also let AsynchronousBufferFileWr...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152342531 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -108,11 +108,7 @@ int releaseMemory() throws IOException { for (int i = 0; i < numBuffers; i++) { Buffer buffer = buffers.remove(); spilledBytes += buffer.getSize(); - try { - spillWriter.writeBlock(buffer); - } finally { - buffer.recycle(); - } + spillWriter.writeBlock(buffer); --- End diff -- Actually, if I see this correctly, here the original code is wrong since it is already recycling a buffer which was added to an asynchronous file write operation. This would lead to data corruption if the buffer is re-used in the meanwhile, wouldn't it?! ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261075#comment-16261075 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152342364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java --- @@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, RequestQueue super(channelID, requestQueue, CALLBACK, true); } + /** +* Writes the given block asynchronously. +* +* @param buffer +* the buffer to be written (will be recycled when done) --- End diff -- good catch, but actually, `SpillableSubpartition` doesn't do any recycling itself: in its `finish()` method, it relies on the buffer being on-heap and then garbage-collected, for the `add()` function, it relies on the caller, i.e. `ResultPartition#add()` (which I also forgot to adapt). > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] also let AsynchronousBufferFileWr...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152342364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java --- @@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, RequestQueue super(channelID, requestQueue, CALLBACK, true); } + /** +* Writes the given block asynchronously. +* +* @param buffer +* the buffer to be written (will be recycled when done) --- End diff -- good catch, but actually, `SpillableSubpartition` doesn't do any recycling itself: in its `finish()` method, it relies on the buffer being on-heap and then garbage-collected, for the `add()` function, it relies on the caller, i.e. `ResultPartition#add()` (which I also forgot to adapt). ---
[jira] [Comment Edited] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261014#comment-16261014 ] Gary Yao edited comment on FLINK-7716 at 11/21/17 4:39 PM: --- [~yew1eb] This ticket and FLINK-7717 are similar to FLINK-7718 which I am currently working on. I wanted to ask you about your progress on this ticket because I can see that we will probably need a common base class for the REST handlers similar to {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler}}. I can create one based on my current work, or if you already have one, I can rebase my code against your changes. What do you think? was (Author: gjy): [~yew1eb] This ticket is similar to FLINK-7718 which I am currently working on. I wanted to ask you about your progress on this ticket because I can see that we will probably need a common base class for the REST handlers similar to {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler}}. I can create one based on my current work, or if you already have one, I can rebase my code against your changes. What do you think? > Port JobManagerMetricsHandler to new REST endpoint > -- > > Key: FLINK-7716 > URL: https://issues.apache.org/jira/browse/FLINK-7716 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hai Zhou UTC+8 > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261014#comment-16261014 ] Gary Yao commented on FLINK-7716: - [~yew1eb] This ticket is similar to FLINK-7718 which I am currently working on. I wanted to ask you about your progress on this ticket because I can see that we will probably need a common base class for the REST handlers similar to {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler}}. I can create one based on my current work, or if you already have one, I can rebase my code against your changes. What do you think? > Port JobManagerMetricsHandler to new REST endpoint > -- > > Key: FLINK-7716 > URL: https://issues.apache.org/jira/browse/FLINK-7716 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hai Zhou UTC+8 > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-4600) Support min/max aggregations for Date/Time/Timestamp/Intervals
[ https://issues.apache.org/jira/browse/FLINK-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4600. - Resolution: Fixed Fix Version/s: 1.5.0 I will close this issue for now. I don't know if we really want to support aggregations on intervals. If this issue pops up again, we can reopen it. All other time types are supported now. > Support min/max aggregations for Date/Time/Timestamp/Intervals > -- > > Key: FLINK-4600 > URL: https://issues.apache.org/jira/browse/FLINK-4600 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Leo Deng > Fix For: 1.5.0 > > > Currently no aggregation supports temporal types. At least min/max should be > added for Date/Time/Timestamp/Intervals. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8097) Add built-in support for min/max aggregation for Date/Time
[ https://issues.apache.org/jira/browse/FLINK-8097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261011#comment-16261011 ] ASF GitHub Bot commented on FLINK-8097: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5027 > Add built-in support for min/max aggregation for Date/Time > -- > > Key: FLINK-8097 > URL: https://issues.apache.org/jira/browse/FLINK-8097 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5027: [FLINK-8097] [table] Add built-in support for min/...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5027 ---
[jira] [Resolved] (FLINK-8097) Add built-in support for min/max aggregation for Date/Time
[ https://issues.apache.org/jira/browse/FLINK-8097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8097. - Resolution: Fixed Fixed in 1.5: 44c603d2b62fff20a7213ad55512dc38f43a50bc > Add built-in support for min/max aggregation for Date/Time > -- > > Key: FLINK-8097 > URL: https://issues.apache.org/jira/browse/FLINK-8097 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8097) Add built-in support for min/max aggregation for Date/Time
[ https://issues.apache.org/jira/browse/FLINK-8097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260993#comment-16260993 ] ASF GitHub Bot commented on FLINK-8097: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5027 Thanks @dianfu. Looks good to merge. Merging... > Add built-in support for min/max aggregation for Date/Time > -- > > Key: FLINK-8097 > URL: https://issues.apache.org/jira/browse/FLINK-8097 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5027: [FLINK-8097] [table] Add built-in support for min/max agg...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5027 Thanks @dianfu. Looks good to merge. Merging... ---
[jira] [Commented] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260970#comment-16260970 ] ASF GitHub Bot commented on FLINK-8038: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5015 > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > Fix For: 1.5.0 > > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map value constructor which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8038. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: c5f5615cf84026039614701b5e6b3b0e003eada0 & 9e3439c013928e52ea99fe87579512f1c2b2c28e > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > Fix For: 1.5.0 > > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map value constructor which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5015 ---
[jira] [Commented] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260883#comment-16260883 ] ASF GitHub Bot commented on FLINK-8038: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5015 Thanks for the update @walterddr. I found a little bug for cardinalities. I will fix it and merge this PR. > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map value constructor which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5015: [FLINK-8038][Table API] Support MAP value constructor
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5015 Thanks for the update @walterddr. I found a little bug for cardinalities. I will fix it and merge this PR. ---
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260867#comment-16260867 ] ASF GitHub Bot commented on FLINK-7880: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152299398 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java --- @@ -95,15 +97,20 @@ private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS); + // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + private NioEventLoopGroup nioGroup; - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS); + @Before + public void setUp() throws Exception { + nioGroup = new NioEventLoopGroup(); --- End diff -- you could just write `private NioEventLoopGroup nioGroup = new NioEventLoopGroup();` and remove the `@Before` method > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260868#comment-16260868 ] ASF GitHub Bot commented on FLINK-7880: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152294984 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,28 +167,57 @@ public String getClientName() { * Shuts down the client and closes all connections. * * After a call to this method, all returned futures will be failed. +* +* @return A {@link CompletableFuture} that will be completed when the shutdown process is done. */ - public void shutdown() { - if (shutDown.compareAndSet(false, true)) { + public CompletableFuture shutdown() { --- End diff -- should be typed to `Void`. > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260869#comment-16260869 ] ASF GitHub Bot commented on FLINK-7880: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152300724 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,28 +167,57 @@ public String getClientName() { * Shuts down the client and closes all connections. * * After a call to this method, all returned futures will be failed. +* +* @return A {@link CompletableFuture} that will be completed when the shutdown process is done. */ - public void shutdown() { - if (shutDown.compareAndSet(false, true)) { + public CompletableFuture shutdown() { + final CompletableFuture newShutdownFuture = new CompletableFuture<>(); + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { + + final List> connectionFutures = new ArrayList<>(); + for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + CompletableFuture.allOf( + connectionFutures.toArray(new CompletableFuture[connectionFutures.size()]) + ).whenComplete((result, throwable) -> { + if (throwable != null) { + newShutdownFuture.completeExceptionally(throwable); + } else if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null && !group.isShutdown()) { + group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + newShutdownFuture.complete(null); + } else { + newShutdownFuture.completeExceptionally(finished.cause()); + } + }); + } else { + newShutdownFuture.complete(null); + } + } else { + newShutdownFuture.complete(null); } + }); + + // check again if in the meantime another thread completed the future + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { --- End diff -- where in close() do we set the shutdown future to null? I only see that being done in sendRequest. (which seems fishy) > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/t
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260871#comment-16260871 ] ASF GitHub Bot commented on FLINK-7880: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152297601 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -312,32 +345,43 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { - synchronized (connectLock) { - if (!closed) { - if (failureCause == null) { - failureCause = cause; - } + private CompletableFuture close(Throwable cause) { --- End diff -- same as above > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260866#comment-16260866 ] ASF GitHub Bot commented on FLINK-7880: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152298240 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -312,32 +345,43 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { - synchronized (connectLock) { - if (!closed) { - if (failureCause == null) { - failureCause = cause; - } + private CompletableFuture close(Throwable cause) { + CompletableFuture future = new CompletableFuture<>(); + if (connectionShutdownFuture.compareAndSet(null, future)) { + synchronized (connectLock) { + if (!closed) { --- End diff -- this seems unnecessary, doesn't the check at L358 guarantee that the entire branch is only executed once? > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260870#comment-16260870 ] ASF GitHub Bot commented on FLINK-7880: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152298612 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -386,6 +430,9 @@ private PendingRequest(REQ request) { /** Reference to a failure that was reported by the channel. */ private final AtomicReference failureCause = new AtomicReference<>(); + /** Atomic shut down future. */ + private final AtomicReference> connectionShutdownFuture = new AtomicReference<>(null); --- End diff -- why does this one suddenly return a boolean? > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152298612 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -386,6 +430,9 @@ private PendingRequest(REQ request) { /** Reference to a failure that was reported by the channel. */ private final AtomicReference failureCause = new AtomicReference<>(); + /** Atomic shut down future. */ + private final AtomicReference> connectionShutdownFuture = new AtomicReference<>(null); --- End diff -- why does this one suddenly return a boolean? ---
[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152298240 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -312,32 +345,43 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { - synchronized (connectLock) { - if (!closed) { - if (failureCause == null) { - failureCause = cause; - } + private CompletableFuture close(Throwable cause) { + CompletableFuture future = new CompletableFuture<>(); + if (connectionShutdownFuture.compareAndSet(null, future)) { + synchronized (connectLock) { + if (!closed) { --- End diff -- this seems unnecessary, doesn't the check at L358 guarantee that the entire branch is only executed once? ---
[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152299398 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java --- @@ -95,15 +97,20 @@ private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS); + // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + private NioEventLoopGroup nioGroup; - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS); + @Before + public void setUp() throws Exception { + nioGroup = new NioEventLoopGroup(); --- End diff -- you could just write `private NioEventLoopGroup nioGroup = new NioEventLoopGroup();` and remove the `@Before` method ---
[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152300724 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,28 +167,57 @@ public String getClientName() { * Shuts down the client and closes all connections. * * After a call to this method, all returned futures will be failed. +* +* @return A {@link CompletableFuture} that will be completed when the shutdown process is done. */ - public void shutdown() { - if (shutDown.compareAndSet(false, true)) { + public CompletableFuture shutdown() { + final CompletableFuture newShutdownFuture = new CompletableFuture<>(); + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { + + final List> connectionFutures = new ArrayList<>(); + for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + CompletableFuture.allOf( + connectionFutures.toArray(new CompletableFuture[connectionFutures.size()]) + ).whenComplete((result, throwable) -> { + if (throwable != null) { + newShutdownFuture.completeExceptionally(throwable); + } else if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null && !group.isShutdown()) { + group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + newShutdownFuture.complete(null); + } else { + newShutdownFuture.completeExceptionally(finished.cause()); + } + }); + } else { + newShutdownFuture.complete(null); + } + } else { + newShutdownFuture.complete(null); } + }); + + // check again if in the meantime another thread completed the future + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { --- End diff -- where in close() do we set the shutdown future to null? I only see that being done in sendRequest. (which seems fishy) ---