[GitHub] flink issue #2916: [FLINK-4523] [kinesis] Allow Kinesis Consumer to start fr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Merged to master. Thank you for your contribution @tony810430 ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2916: [FLINK-4523] [kinesis] Allow Kinesis Consumer to s...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2916 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 @StephanEwen I have use TemporaryFolder to replace creating File manually. There are some tips: 1. TemporaryFolder should invoke `create()` in setup manually 2. in `shutdown()` should invoke `tmp.delete()` to delete all the dir and files recursively. 3. file name add random string the generated files like this in root directory `flink/flink-runtime/target/tmp`: ``` tmp/ âââ junit4955582781992164088 âââ 1fde53fa-7f7c-4f55-921c-04d078601cfd â  âââ flink-dist-cache-664fc7de-51af-4fc1-86a0-3d8c76bb65df âââ 30326e4a-205f-4982-a920-e790a7f52730 â  âââ flink-dist-cache-d3eb183b-ca0c-4b23-877f-adf1540d6563 â  âââ b217df314c282e23301ba8f8351fca5c â  âââ cacheFile1e7f5410-2326-4916-8a50-986d49393a7f âââ 35b19e4e-1515-4f93-813e-d4a40b09baeb â  âââ flink-dist-cache-af593973-73a0-4996-8323-40978a703317 âââ c754952f-fe8d-465f-9424-1303e734e990 â  âââ flink-dist-cache-18f1984b-84ca-4db1-be2c-d2a3bc0a5fe3 âââ cacheFile1e7f5410-2326-4916-8a50-986d49393a7f âââ fde2c7bc-8bb1-43ec-8e12-21510322b2b1 âââ flink-dist-cache-b34f1e0b-79a7-4cf7-ab3f-7a7a93a04d73 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3035 Thank you for picking this up @StephanEwen! I've taken a look at your approach in the local branch, +1 to the approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2941: [FLINK-3555] Web interface does not render job informatio...
Github user iampeter commented on the issue: https://github.com/apache/flink/pull/2941 I agree with @sachingoel0101 as for overrriding classes from `vendor.css` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3197: [FLINK-5567] [Table API & SQL]Introduce and migrat...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3197 [FLINK-5567] [Table API & SQL]Introduce and migrate current table statistics to FlinkStatistics This pr includes two commits, the first commit is Related to [https://github.com/apache/flink/pull/3196](url), the second commit is to introduce and migrate current table statistics to FlinkStatistics. So please focus on second commit. The main changes including: 1. Introduce FlinkStatistic class, which is an implementation of Calcite Statistic. 2. Integrate FlinkStatistic with FlinkTable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5567 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3197.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3197 commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23 Author: 槿çDate: 2017-01-24T06:34:01Z Introduce structure to hold table and column level statistics commit 56c51b0f8d7983b8593946f64ece2b4881f0d723 Author: 槿ç Date: 2017-01-24T06:57:08Z ntroduce and migrate current table statistics to FlinkStatistics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3196 [FLINK-5566] [Table API & SQL]Introduce structure to hold table and column level statistics This pr aims to introduce structure to hold table and column level statistics. TableStatsï¼ Responsible for hold table level statistics ColumnStats: Responsible for hold column level statistics. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5566 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3196.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3196 commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23 Author: 槿çDate: 2017-01-24T06:34:01Z Introduce structure to hold table and column level statistics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2937: [FLINK-4303] [cep] Examples for CEP library.
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2937#discussion_r97481654 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/examples/CEPMonitoringExample.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.scala.examples + +import org.apache.flink.cep.scala.CEP +import org.apache.flink.cep.scala.examples.events.{MonitoringEvent, TemperatureAlert, TemperatureEvent, TemperatureWarning} +import org.apache.flink.cep.scala.examples.sources.MonitoringEventSource +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} +import org.apache.flink.streaming.api.windowing.time.Time + +object CEPMonitoringExample { --- End diff -- It would be great if there's Javadoc explaining what the example demonstrates exactly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2937: [FLINK-4303] [cep] Examples for CEP library.
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2937#discussion_r97481738 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/examples/events/MonitoringEvent.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.scala.examples.events + +class MonitoringEvent(rackID: Int) { --- End diff -- Same here: Class-level Javadocs will be nice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2916: [FLINK-4523] [kinesis] Allow Kinesis Consumer to start fr...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Rebasing and merging this ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3118: [FLINK-5224] [table] Improve UDTF: emit rows directly ins...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3118 Hi @twalthr , I have updated this PR. I refactored the code of `FlinkCorrelate#generateFunction` to separate the generation of `FlatMapFunction` and `TableFunctionCollector`. This can fix the problem that the generated `TableFunctionCollector` contains another `TableFunctionCollector` instance variable and makes the logic clean. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/2974 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3128 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3089: [FLINK-5497] remove duplicated tests
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3089 Thanks! I will try to get to this in the remainder of the week. Speeding up builds is a valuable thing :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3116: [docs] [metrics] Update metrics documentation
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3116 I think this is a good improvement over how it was before. +1 to merge for master and 1.2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3125: [FLINK-5499][JobManager]Reuse the resource location of pr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3125 Thank you for opening this pull request. I think the feature is a good idea, but I would like to approach it a bit broader: - On state restore, this should prefer the old state location, agreed - If no such location exists, it should still try to co-locate by input. Especially for the batch execution, that is quite important. Also, this would need some tests. I'll add some more detailed comments to the issue soon... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3127: [FLINK-5481] Add type extraction from collection
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 I am a bit skeptical about the special case handling or Row in the type exatractor there. Why specifically support row and not other types? To me, Row does not seem like a most common type to be put into the collection inputs that it needs such special case handling. Also, this does not detect if the different rows have conflicting types in the columns (such as `row1[x] = int, row2[x] = String). What do you think about simply improving how to generate a row type info (similar as we did with `TypeHint` for other types)? Then `StreamExecutionEnvironment.fromCollection(rows, type)` should work quite well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3138: #Flink-5522 Storm Local Cluster can't work with powermock
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3138 @liuyuzhong7 Would be good to know if you plan to follow up on this issue. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3134: [FLINK-5450] Fix restore from legacy log message
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3134 I think this should go to master and Flink-1.2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3190 @shijinkui I think there are already good temp file utils in JUnit itself. Do we need to change anything in the build when we fix the tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3173: [FLINK-5577][yarn]Each time application is submitted to y...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3173 Thanks for reporting and fixing this issue! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3173: [FLINK-5577][yarn]Each time application is submitted to y...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3173 I think this looks good, merging this for the master... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3193: [FLINK-5613][query] querying a non-existing key is...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3193 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3193: [FLINK-5613][query] querying a non-existing key is incons...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3193 Merging ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r97353282 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -1916,6 +1918,50 @@ else if (value instanceof Row) { } } + public static TypeInformation getForCollection(Iterable value) { + return new TypeExtractor().privateGetForIterable(value); + } + + public static TypeInformation getForCollection(X[] value) { --- End diff -- Add Javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r97355537 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -1916,6 +1918,50 @@ else if (value instanceof Row) { } } + public static TypeInformation getForCollection(Iterable value) { + return new TypeExtractor().privateGetForIterable(value); + } + + public static TypeInformation getForCollection(X[] value) { + return new TypeExtractor().privateGetForIterable(Arrays.asList(value)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private TypeInformation privateGetForIterable(Iterable value) { + checkNotNull(value); + + Iterator it = value.iterator(); + X v = checkNotNull(it.next()); + + if (v instanceof Row) { + int arity =((Row) v).getArity(); --- End diff -- Missing space. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r97353232 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -1916,6 +1918,50 @@ else if (value instanceof Row) { } } + public static TypeInformation getForCollection(Iterable value) { --- End diff -- Add Javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r97355576 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -1916,6 +1918,50 @@ else if (value instanceof Row) { } } + public static TypeInformation getForCollection(Iterable value) { + return new TypeExtractor().privateGetForIterable(value); + } + + public static TypeInformation getForCollection(X[] value) { + return new TypeExtractor().privateGetForIterable(Arrays.asList(value)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private TypeInformation privateGetForIterable(Iterable value) { + checkNotNull(value); + + Iterator it = value.iterator(); + X v = checkNotNull(it.next()); + + if (v instanceof Row) { + int arity =((Row) v).getArity(); + Iterable rows = (Iterable) value; + ListtypesInRow = new ArrayList<>(arity); + TypeInformation[] temp = new TypeInformation[arity]; + Collections.addAll(typesInRow, temp); + + for (Row r: rows) { --- End diff -- Missing space. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r97354898 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -1916,6 +1918,50 @@ else if (value instanceof Row) { } } + public static TypeInformation getForCollection(Iterable value) { + return new TypeExtractor().privateGetForIterable(value); + } + + public static TypeInformation getForCollection(X[] value) { + return new TypeExtractor().privateGetForIterable(Arrays.asList(value)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private TypeInformation privateGetForIterable(Iterable value) { + checkNotNull(value); + + Iterator it = value.iterator(); + X v = checkNotNull(it.next()); --- End diff -- Calling `next` without checking can result in `NoSuchElementException`. We should check that first and throw a helpful exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r97359437 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -1916,6 +1918,50 @@ else if (value instanceof Row) { } } + public static TypeInformation getForCollection(Iterable value) { + return new TypeExtractor().privateGetForIterable(value); + } + + public static TypeInformation getForCollection(X[] value) { + return new TypeExtractor().privateGetForIterable(Arrays.asList(value)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private TypeInformation privateGetForIterable(Iterable value) { --- End diff -- Would be great if you could add some inline comments in this method, just to quickly see what your code is doing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3183: [backport] [FLINK-5214] [FLINK-5229] Backport Stre...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3183 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3183: [backport] [FLINK-5214] [FLINK-5229] Backport StreamTask ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3183 Merged manually into `release-1.2` branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3193: [FLINK-5527][query] querying a non-existing key is incons...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3193 +1 to merge if Travis passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3178 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3179: [FLINK-5229] [state] Cleanup of operator snapshots...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3179 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3021: [FLINK-3617] Prevent NPE in CaseClassSerializer an...
Github user chermenin closed the pull request at: https://github.com/apache/flink/pull/3021 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2974: [FLINK-5298] TM checks that log file exists
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2974 @tillrohrmann I've addressed your comments. Will merge this once travis passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97354396 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, --- End diff -- Without a JobManager no BlobService is started. This means the TM would fail earlier then we want him to. While trying it out i found another exception that should be wrapped though :> --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3127: [FLINK-5481] Add type extraction from collection
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3127 I will shepherd this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3195: [FLINK-5617] API stability check for Flink 1.2
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3195 Yes, I'll mark them as such while merging. Thank you for taking a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3195: [FLINK-5617] API stability check for Flink 1.2
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3195 @rmetzger I think they should be `@PublicEvolging`, can you change it in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant
[ https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834787#comment-15834787 ] ASF GitHub Bot commented on FLINK-5298: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346342 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, --- End diff -- maybe `ActorRef.noSender()` is enough here. > TaskManager crashes when TM log not existant > > > Key: FLINK-5298 > URL: https://issues.apache.org/jira/browse/FLINK-5298 > Project: Flink > Issue Type: Bug > Components: Mesos, TaskManager, Webfrontend >Affects Versions: 1.1.0, 1.2.0 >Reporter: Mischa Krüger >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.2.0 > > > {code} > java.io.FileNotFoundException: flink-taskmanager.out (No such file or > directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at
[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant
[ https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834789#comment-15834789 ] ASF GitHub Bot commented on FLINK-5298: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346835 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + config, + false, + true); + + // - + + final ActorGateway tm = taskManager; + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor()); + + new Within(d) { + @Override + protected void run() { + Future logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout); + + logFuture.onSuccess(new OnSuccess() { + @Override + public void onSuccess(Object result) throws Throwable { + Assert.fail(); + } + }, context); + logFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + testActorGateway.tell(new Status.Success("success")); + } + }, context); + + Status.Success msg = expectMsgClass(Status.Success.class); + Assert.assertEquals("success", msg.status()); + } + }; + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); --- End diff -- Better let the exception bubble up. Less code ;-) > TaskManager crashes when TM log not existant > > > Key: FLINK-5298 > URL: https://issues.apache.org/jira/browse/FLINK-5298 > Project: Flink > Issue Type: Bug > Components: Mesos, TaskManager, Webfrontend >Affects
[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant
[ https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834788#comment-15834788 ] ASF GitHub Bot commented on FLINK-5298: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97347534 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.router.Routed; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Option; +import scala.collection.JavaConverters; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class TaskManagerLogHandlerTest { + @Test + public void testLogFetchingFailure() throws Exception { + // = setup TaskManager = + InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); + TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); + + Instance taskManager = mock(Instance.class); + when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); + when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); + CompletableFuture future = new FlinkCompletableFuture<>(); + future.completeExceptionally(new IOException("failure")); + when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future); + + // = setup JobManager == + + ActorGateway jobManagerGateway = mock(ActorGateway.class); + Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); + +
[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant
[ https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834786#comment-15834786 ] ASF GitHub Bot commented on FLINK-5298: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346693 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + config, + false, + true); + + // - + + final ActorGateway tm = taskManager; + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor()); + + new Within(d) { + @Override + protected void run() { + Future logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout); + + logFuture.onSuccess(new OnSuccess() { + @Override + public void onSuccess(Object result) throws Throwable { + Assert.fail(); + } + }, context); + logFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + testActorGateway.tell(new Status.Success("success")); + } + }, context); --- End diff -- Maybe `Await.result` is a bit more succinct. > TaskManager crashes when TM log not existant > > > Key: FLINK-5298 > URL: https://issues.apache.org/jira/browse/FLINK-5298 > Project: Flink > Issue Type: Bug > Components: Mesos, TaskManager, Webfrontend >Affects Versions: 1.1.0, 1.2.0 >Reporter: Mischa Krüger >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.2.0 > > > {code} > java.io.FileNotFoundException: flink-taskmanager.out (No such file or > directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at >
[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346693 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + config, + false, + true); + + // - + + final ActorGateway tm = taskManager; + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor()); + + new Within(d) { + @Override + protected void run() { + Future logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout); + + logFuture.onSuccess(new OnSuccess() { + @Override + public void onSuccess(Object result) throws Throwable { + Assert.fail(); + } + }, context); + logFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + testActorGateway.tell(new Status.Success("success")); + } + }, context); --- End diff -- Maybe `Await.result` is a bit more succinct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346342 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, --- End diff -- maybe `ActorRef.noSender()` is enough here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4917) Deprecate "CheckpointedAsynchronously" interface
[ https://issues.apache.org/jira/browse/FLINK-4917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834785#comment-15834785 ] ASF GitHub Bot commented on FLINK-4917: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3087 @mtunique I am addressing the `Checkpointed` in a quick followup to this pull request, pretty much copying the docs from `CheckpointedAsynchronously`. > Deprecate "CheckpointedAsynchronously" interface > > > Key: FLINK-4917 > URL: https://issues.apache.org/jira/browse/FLINK-4917 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Stephan Ewen > Labels: easyfix, starter > > The {{CheckpointedAsynchronously}} should be deprecated, as it is no longer > part of the new operator state abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97347534 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.router.Routed; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Option; +import scala.collection.JavaConverters; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class TaskManagerLogHandlerTest { + @Test + public void testLogFetchingFailure() throws Exception { + // = setup TaskManager = + InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); + TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); + + Instance taskManager = mock(Instance.class); + when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); + when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); + CompletableFuture future = new FlinkCompletableFuture<>(); + future.completeExceptionally(new IOException("failure")); + when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future); + + // = setup JobManager == + + ActorGateway jobManagerGateway = mock(ActorGateway.class); + Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); + + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer)); +
[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346835 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor(; + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + config, + false, + true); + + // - + + final ActorGateway tm = taskManager; + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor()); + + new Within(d) { + @Override + protected void run() { + Future logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout); + + logFuture.onSuccess(new OnSuccess() { + @Override + public void onSuccess(Object result) throws Throwable { + Assert.fail(); + } + }, context); + logFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + testActorGateway.tell(new Status.Success("success")); + } + }, context); + + Status.Success msg = expectMsgClass(Status.Success.class); + Assert.assertEquals("success", msg.status()); + } + }; + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); --- End diff -- Better let the exception bubble up. Less code ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3087: [FLINK-4917] Deprecate "CheckpointedAsynchronously" inter...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3087 @mtunique I am addressing the `Checkpointed` in a quick followup to this pull request, pretty much copying the docs from `CheckpointedAsynchronously`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5617) Check new public APIs in 1.2 release
[ https://issues.apache.org/jira/browse/FLINK-5617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834768#comment-15834768 ] ASF GitHub Bot commented on FLINK-5617: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/3195 [FLINK-5617] API stability check for Flink 1.2 With this PR, I'm marking some of the new methods in Flink 1.2 as public evolving / internal, instead of public. There are the reports for the covered modules: http://people.apache.org/~rmetzger/rel-1.2-japicmp/ @aljoscha and @kl0u: Are the `fold()` and `reduce()` methods intentionally in the `@Public` scope? http://people.apache.org/~rmetzger/rel-1.2-japicmp/flink-streaming-java.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink5617 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3195 commit 12d9d631455af2c4b6af1024a8865b534f4db9c8 Author: Robert MetzgerDate: 2017-01-23T13:31:27Z [FLINK-5617] Change reference version for API stability checks to 1.1.4 commit f7323c2e72e24da3a55fd70d6d27ae33026ecc86 Author: Robert Metzger Date: 2017-01-23T15:28:48Z [FLINK-5617] Mark some methods as PublicEvolving or Internal > Check new public APIs in 1.2 release > > > Key: FLINK-5617 > URL: https://issues.apache.org/jira/browse/FLINK-5617 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.2.0 > > > Before releasing Flink 1.2.0, I would like to quickly review which new public > methods we are supporting in future releases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 Rebased. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5618) Add queryable state documentation
Ufuk Celebi created FLINK-5618: -- Summary: Add queryable state documentation Key: FLINK-5618 URL: https://issues.apache.org/jira/browse/FLINK-5618 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Ufuk Celebi Adds docs about how to use queryable state usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators
[ https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834774#comment-15834774 ] ASF GitHub Bot commented on FLINK-5473: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 Rebased. > setMaxParallelism() higher than 1 is possible on non-parallel operators > --- > > Key: FLINK-5473 > URL: https://issues.apache.org/jira/browse/FLINK-5473 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Stefan Richter > > While trying out Flink 1.2, I found out that you can set a maxParallelism > higher than 1 on a non-parallel operator. > I think we should have the same semantics as the setParallelism() method. > Also, when setting a global maxParallelism in the execution environment, it > will be set as a default value for the non-parallel operator. > When restoring a savepoint from 1.1, you have to set the maxParallelism to > the parallelism of the 1.1 job. Non-parallel operators will then also get the > maxPar set to this value, leading to an error on restore. > So currently, users restoring from 1.1 to 1.2 have to manually set the > maxParallelism to 1 for all non-parallel operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97343787 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int numCounters, int numGauges, in //- // Serialization //- + public static class MetricDumpSerializer { + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); /** * Serializes the given metrics and returns the resulting byte array. +* +* Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned --- End diff -- Very nice --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3118: [FLINK-5224] [table] Improve UDTF: emit rows directly ins...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3118 Thank you for your reviewing @twalthr , I will update this PR ASAP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5224) Improve UDTF: emit rows directly instead of buffering them
[ https://issues.apache.org/jira/browse/FLINK-5224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834769#comment-15834769 ] ASF GitHub Bot commented on FLINK-5224: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3118 Thank you for your reviewing @twalthr , I will update this PR ASAP. > Improve UDTF: emit rows directly instead of buffering them > -- > > Key: FLINK-5224 > URL: https://issues.apache.org/jira/browse/FLINK-5224 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > This needs to code generate a `Collector` and register it into instance of > {{TableFunction}}, and emit the rows generated by the UDTF directly instead > of buffering them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834770#comment-15834770 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97343787 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int numCounters, int numGauges, in //- // Serialization //- + public static class MetricDumpSerializer { + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); /** * Serializes the given metrics and returns the resulting byte array. +* +* Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned --- End diff -- Very nice > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3195: [FLINK-5617] API stability check for Flink 1.2
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/3195 [FLINK-5617] API stability check for Flink 1.2 With this PR, I'm marking some of the new methods in Flink 1.2 as public evolving / internal, instead of public. There are the reports for the covered modules: http://people.apache.org/~rmetzger/rel-1.2-japicmp/ @aljoscha and @kl0u: Are the `fold()` and `reduce()` methods intentionally in the `@Public` scope? http://people.apache.org/~rmetzger/rel-1.2-japicmp/flink-streaming-java.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink5617 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3195 commit 12d9d631455af2c4b6af1024a8865b534f4db9c8 Author: Robert MetzgerDate: 2017-01-23T13:31:27Z [FLINK-5617] Change reference version for API stability checks to 1.1.4 commit f7323c2e72e24da3a55fd70d6d27ae33026ecc86 Author: Robert Metzger Date: 2017-01-23T15:28:48Z [FLINK-5617] Mark some methods as PublicEvolving or Internal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3087: [FLINK-4917] Deprecate "CheckpointedAsynchronously" inter...
Github user mtunique commented on the issue: https://github.com/apache/flink/pull/3087 Maybe we should open an issue about `Checkpointed` java doc to talk about how to replace the APIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4917) Deprecate "CheckpointedAsynchronously" interface
[ https://issues.apache.org/jira/browse/FLINK-4917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834763#comment-15834763 ] ASF GitHub Bot commented on FLINK-4917: --- Github user mtunique commented on the issue: https://github.com/apache/flink/pull/3087 Maybe we should open an issue about `Checkpointed` java doc to talk about how to replace the APIs. > Deprecate "CheckpointedAsynchronously" interface > > > Key: FLINK-4917 > URL: https://issues.apache.org/jira/browse/FLINK-4917 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Stephan Ewen > Labels: easyfix, starter > > The {{CheckpointedAsynchronously}} should be deprecated, as it is no longer > part of the new operator state abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5395) support locally build distribution by script create_release_files.sh
[ https://issues.apache.org/jira/browse/FLINK-5395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834762#comment-15834762 ] ASF GitHub Bot commented on FLINK-5395: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3049 +1 is good to merge. I'll test it as part of the RC1 for 1.2.0 creation > support locally build distribution by script create_release_files.sh > > > Key: FLINK-5395 > URL: https://issues.apache.org/jira/browse/FLINK-5395 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: shijinkui > > create_release_files.sh is build flink release only. It's hard to build > custom local Flink release distribution. > Let create_release_files.sh support: > 1. custom git repo url > 2. custom build special scala and hadoop version > 3. add `tools/flink` to .gitignore > 4. add usage -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3049: [FLINK-5395] [Build System] support locally build distrib...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3049 +1 is good to merge. I'll test it as part of the RC1 for 1.2.0 creation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3087: [FLINK-4917] Deprecate "CheckpointedAsynchronously" inter...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3087 Thanks, much better now! Will merge this for 1.2 and master... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4917) Deprecate "CheckpointedAsynchronously" interface
[ https://issues.apache.org/jira/browse/FLINK-4917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834757#comment-15834757 ] ASF GitHub Bot commented on FLINK-4917: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3087 Thanks, much better now! Will merge this for 1.2 and master... > Deprecate "CheckpointedAsynchronously" interface > > > Key: FLINK-4917 > URL: https://issues.apache.org/jira/browse/FLINK-4917 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Stephan Ewen > Labels: easyfix, starter > > The {{CheckpointedAsynchronously}} should be deprecated, as it is no longer > part of the new operator state abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3149 Sounds good ! Looking forward that ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834751#comment-15834751 ] ASF GitHub Bot commented on FLINK-2168: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3149 Sounds good ! Looking forward that ! > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834744#comment-15834744 ] ASF GitHub Bot commented on FLINK-4905: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 Bit of background how the error happens: - The test throws a `SuccessException` - While being in the finally clause and shutting down the CluratorClient, the containing `Task` has not seen the exception. - When the commitOffsets() call fails, this overrides the `SuccessException` as the reason why the streaming program terminated. > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3194: [FLINK-5615][query] execute the QueryableStateITCa...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3194 [FLINK-5615][query] execute the QueryableStateITCase for all three state back-ends This extends the `QueryableStateITCase` so that it is able to run with any selected state backend. Some optimisations reduce the total runtime of the test cases so that we are able to run the tests with all three currently available backends, i.e. `MemoryStateBackend`, `FsStateBackend`, and `RocksDBStateBackend`, with little extra costs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5615 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3194 commit 9c1a247ada0015ff1b97c6017e8c1de874ba6d17 Author: Nico KruberDate: 2017-01-17T13:26:16Z [FLINK-5613][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. commit cccbc5f30e09f5ccbc61a75fc9519dacf91c5522 Author: Nico Kruber Date: 2017-01-23T14:17:46Z [FLINK-5615][query] improve testQueryableStateWithTaskManagerFailure test duration This is based on the following points: * slow down QueryableStateITCase#TestKeyRangeSource for the rest of the program to make more progress (we do not need a full-speed source here!) * reduce the checkpointing interval * reduce the amount of progress before starting our own evaluation * reduce the number of checkpoints to wait for before killing a TM * reduce the thread waiting time when asking how many checkpoints exist Note that by slowing down QueryableStateITCase#TestKeyRangeSource, the other tests should only be affected positively, too, since they also did not really need a full-speed source and thus have more CPU cycles for their own tasks. This, among with commit 3dd506dd65f6f9f9e8879e6bf6df0261435d5317 Author: Nico Kruber Date: 2017-01-23T14:47:40Z [FLINK-5615][query] speed up some more tests in QueryableStateITCase This is based on reducing the number of keys the source generates. We do not really need 1024 different keys for the tests - go with 256 now. commit ee6b78eb3a91f39386148e3ef1c55a0f3824843f Author: Nico Kruber Date: 2017-01-23T14:57:09Z [FLINK-5615][query] execute the QueryableStateITCase for all state back-ends --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 Bit of background how the error happens: - The test throws a `SuccessException` - While being in the finally clause and shutting down the CluratorClient, the containing `Task` has not seen the exception. - When the commitOffsets() call fails, this overrides the `SuccessException` as the reason why the streaming program terminated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834743#comment-15834743 ] Jark Wu commented on FLINK-5592: In Table API, please use {code} // Java Table API tEnv.scan("rows").select("person.get('age')") // Scala Table API tEnv.scan("rows").select('person.get("age")) {code} > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev >Priority: Minor > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5615) queryable state: execute the QueryableStateITCase for all three state back-ends
[ https://issues.apache.org/jira/browse/FLINK-5615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834739#comment-15834739 ] ASF GitHub Bot commented on FLINK-5615: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3194 [FLINK-5615][query] execute the QueryableStateITCase for all three state back-ends This extends the `QueryableStateITCase` so that it is able to run with any selected state backend. Some optimisations reduce the total runtime of the test cases so that we are able to run the tests with all three currently available backends, i.e. `MemoryStateBackend`, `FsStateBackend`, and `RocksDBStateBackend`, with little extra costs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5615 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3194 commit 9c1a247ada0015ff1b97c6017e8c1de874ba6d17 Author: Nico KruberDate: 2017-01-17T13:26:16Z [FLINK-5613][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. commit cccbc5f30e09f5ccbc61a75fc9519dacf91c5522 Author: Nico Kruber Date: 2017-01-23T14:17:46Z [FLINK-5615][query] improve testQueryableStateWithTaskManagerFailure test duration This is based on the following points: * slow down QueryableStateITCase#TestKeyRangeSource for the rest of the program to make more progress (we do not need a full-speed source here!) * reduce the checkpointing interval * reduce the amount of progress before starting our own evaluation * reduce the number of checkpoints to wait for before killing a TM * reduce the thread waiting time when asking how many checkpoints exist Note that by slowing down QueryableStateITCase#TestKeyRangeSource, the other tests should only be affected positively, too, since they also did not really need a full-speed source and thus have more CPU cycles for their own tasks. This, among with commit 3dd506dd65f6f9f9e8879e6bf6df0261435d5317 Author: Nico Kruber Date: 2017-01-23T14:47:40Z [FLINK-5615][query] speed up some more tests in QueryableStateITCase This is based on reducing the number of keys the source generates. We do not really need 1024 different keys for the tests - go with 256 now. commit ee6b78eb3a91f39386148e3ef1c55a0f3824843f Author: Nico Kruber Date: 2017-01-23T14:57:09Z [FLINK-5615][query] execute the QueryableStateITCase for all state back-ends > queryable state: execute the QueryableStateITCase for all three state > back-ends > --- > > Key: FLINK-5615 > URL: https://issues.apache.org/jira/browse/FLINK-5615 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The QueryableStateITCase currently is only tested with the MemoryStateBackend > but as has been seen in the past, some errors or inconsistent behaviour only > appeared with different state back-ends. It should thus be extended to be > tested with all three currently existing state back-ends. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators
[ https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834714#comment-15834714 ] ASF GitHub Bot commented on FLINK-5473: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97337263 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java --- @@ -57,47 +57,51 @@ /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; - - private final SerializableObject stateMonitor = new SerializableObject(); + + public static final int VALUE_NOT_SET = -1; + + private final Object stateMonitor = new Object(); private final ExecutionGraph graph; private final JobVertex jobVertex; private final ExecutionVertex[] taskVertices; - private IntermediateResult[] producedDataSets; + private final IntermediateResult[] producedDataSets; private final List inputs; private final int parallelism; - private final int maxParallelism; - private final boolean[] finishedSubtasks; - - private volatile int numSubtasksInFinalState; - + private final SlotSharingGroup slotSharingGroup; - + private final CoLocationGroup coLocationGroup; - + private final InputSplit[] inputSplits; + private final int maxParallelismConfigured; + + private int maxParallelismDerived; + + private volatile int numSubtasksInFinalState; + /** * Serialized task information which is for all sub tasks the same. Thus, it avoids to * serialize the same information multiple times in order to create the * TaskDeploymentDescriptors. */ - private final SerializedValue serializedTaskInformation; + private SerializedValue serializedTaskInformation; private InputSplitAssigner splitAssigner; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, - Time timeout) throws JobException, IOException { + Time timeout) throws JobException { --- End diff -- You are right, but I kept the indentation to avoid formatting changes. > setMaxParallelism() higher than 1 is possible on non-parallel operators > --- > > Key: FLINK-5473 > URL: https://issues.apache.org/jira/browse/FLINK-5473 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Stefan Richter > > While trying out Flink 1.2, I found out that you can set a maxParallelism > higher than 1 on a non-parallel operator. > I think we should have the same semantics as the setParallelism() method. > Also, when setting a global maxParallelism in the execution environment, it > will be set as a default value for the non-parallel operator. > When restoring a savepoint from 1.1, you have to set the maxParallelism to > the parallelism of the 1.1 job. Non-parallel operators will then also get the > maxPar set to this value, leading to an error on restore. > So currently, users restoring from 1.1 to 1.2 have to manually set the > maxParallelism to 1 for all non-parallel operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3193: [FLINK-5527][query] querying a non-existing key is...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3193 [FLINK-5527][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5613 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3193.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3193 commit a767d4ec9e88c39d3902fc633b331fb64163 Author: Nico KruberDate: 2017-01-17T13:26:16Z [FLINK-5527][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3128: [FLINK-5464] Improve MetricDumpSerialization error handli...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 @uce I've addressed your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834724#comment-15834724 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 @uce I've addressed your comments. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97337263 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java --- @@ -57,47 +57,51 @@ /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; - - private final SerializableObject stateMonitor = new SerializableObject(); + + public static final int VALUE_NOT_SET = -1; + + private final Object stateMonitor = new Object(); private final ExecutionGraph graph; private final JobVertex jobVertex; private final ExecutionVertex[] taskVertices; - private IntermediateResult[] producedDataSets; + private final IntermediateResult[] producedDataSets; private final List inputs; private final int parallelism; - private final int maxParallelism; - private final boolean[] finishedSubtasks; - - private volatile int numSubtasksInFinalState; - + private final SlotSharingGroup slotSharingGroup; - + private final CoLocationGroup coLocationGroup; - + private final InputSplit[] inputSplits; + private final int maxParallelismConfigured; + + private int maxParallelismDerived; + + private volatile int numSubtasksInFinalState; + /** * Serialized task information which is for all sub tasks the same. Thus, it avoids to * serialize the same information multiple times in order to create the * TaskDeploymentDescriptors. */ - private final SerializedValue serializedTaskInformation; + private SerializedValue serializedTaskInformation; private InputSplitAssigner splitAssigner; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, - Time timeout) throws JobException, IOException { + Time timeout) throws JobException { --- End diff -- You are right, but I kept the indentation to avoid formatting changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators
[ https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834718#comment-15834718 ] ASF GitHub Bot commented on FLINK-5473: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 Thanks for the review, @tillrohrmann! I followed all of your suggestions, except for the indentation formatting. > setMaxParallelism() higher than 1 is possible on non-parallel operators > --- > > Key: FLINK-5473 > URL: https://issues.apache.org/jira/browse/FLINK-5473 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Stefan Richter > > While trying out Flink 1.2, I found out that you can set a maxParallelism > higher than 1 on a non-parallel operator. > I think we should have the same semantics as the setParallelism() method. > Also, when setting a global maxParallelism in the execution environment, it > will be set as a default value for the non-parallel operator. > When restoring a savepoint from 1.1, you have to set the maxParallelism to > the parallelism of the 1.1 job. Non-parallel operators will then also get the > maxPar set to this value, leading to an error on restore. > So currently, users restoring from 1.1 to 1.2 have to manually set the > maxParallelism to 1 for all non-parallel operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 Thanks for the review, @tillrohrmann! I followed all of your suggestions, except for the indentation formatting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5527) QueryableState: requesting a non-existing key in MemoryStateBackend or FsStateBackend does not return the default value
[ https://issues.apache.org/jira/browse/FLINK-5527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834715#comment-15834715 ] ASF GitHub Bot commented on FLINK-5527: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3193 [FLINK-5527][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5613 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3193.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3193 commit a767d4ec9e88c39d3902fc633b331fb64163 Author: Nico KruberDate: 2017-01-17T13:26:16Z [FLINK-5527][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. > QueryableState: requesting a non-existing key in MemoryStateBackend or > FsStateBackend does not return the default value > --- > > Key: FLINK-5527 > URL: https://issues.apache.org/jira/browse/FLINK-5527 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Querying for a non-existing key for a state that has a default value set > currently results in an UnknownKeyOrNamespace exception when the > MemoryStateBackend or FsStateBackend is used. It should return the default > value instead just like the RocksDBStateBackend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3025: [FLINK-5365] Mesos AppMaster/TaskManager should ob...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3025 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5365) Mesos AppMaster/TaskManager should obey sigterm
[ https://issues.apache.org/jira/browse/FLINK-5365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-5365. -- Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 1.3.0: Fixed via 5d0d279dc5d82fb63bc6cbd7c8fac2324959a516 1.2.0: Fixed via e0a784197de6c6ff55d0d4e10d3316240706 > Mesos AppMaster/TaskManager should obey sigterm > --- > > Key: FLINK-5365 > URL: https://issues.apache.org/jira/browse/FLINK-5365 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > The AppMaster and TaskManager are ignoring the sigterm sent by > Marathon/Mesos. The reason is simply that the shell scripts used to start > them don't pass the signal to java. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5365) Mesos AppMaster/TaskManager should obey sigterm
[ https://issues.apache.org/jira/browse/FLINK-5365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834710#comment-15834710 ] ASF GitHub Bot commented on FLINK-5365: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3025 > Mesos AppMaster/TaskManager should obey sigterm > --- > > Key: FLINK-5365 > URL: https://issues.apache.org/jira/browse/FLINK-5365 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Minor > > The AppMaster and TaskManager are ignoring the sigterm sent by > Marathon/Mesos. The reason is simply that the shell scripts used to start > them don't pass the signal to java. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev updated FLINK-5592: - Priority: Minor (was: Major) > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev >Priority: Minor > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834697#comment-15834697 ] Anton Solovev edited comment on FLINK-5592 at 1/23/17 3:07 PM: --- [ Hi [~jark], thank you for helping me. I want exactly a row of a number of rows. You are right, it's problem of my code {code} override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]]( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age")), new RowTypeInfo( Array[TypeInformation[_]]( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("more_info", "and_so_on"))), Array("person", "additional") ) } {code} {{getReturnType}} does that thing I want, but how we can get nested field by table api? via table.scan("rows").select("person.name") it doesn't work was (Author: tonycox): [ Hi [~jark], thank you for helping me. I want exactly a row of a number of rows. You are right, it's problem of my code {code} override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]]( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age")), new RowTypeInfo( Array[TypeInformation[_]]( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("more_info", "and_so_on"))), Array("person", "additional") ) } {code} does that thing I want > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at >
[jira] [Reopened] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev reopened FLINK-5592: -- > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3025: [FLINK-5365] Mesos AppMaster/TaskManager should obey sigt...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3025 Thanks for your contribution @EronWright. Really good fix :-) Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5365) Mesos AppMaster/TaskManager should obey sigterm
[ https://issues.apache.org/jira/browse/FLINK-5365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834707#comment-15834707 ] ASF GitHub Bot commented on FLINK-5365: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3025 Thanks for your contribution @EronWright. Really good fix :-) Merging this PR. > Mesos AppMaster/TaskManager should obey sigterm > --- > > Key: FLINK-5365 > URL: https://issues.apache.org/jira/browse/FLINK-5365 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Minor > > The AppMaster and TaskManager are ignoring the sigterm sent by > Marathon/Mesos. The reason is simply that the shell scripts used to start > them don't pass the signal to java. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev closed FLINK-5592. Resolution: Not A Problem > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834697#comment-15834697 ] Anton Solovev commented on FLINK-5592: -- [ Hi [~jark], thank you for helping me. I want exactly a row of a number of rows. You are right, it's problem of my code {code} override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]]( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age")), new RowTypeInfo( Array[TypeInformation[_]]( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("more_info", "and_so_on"))), Array("person", "additional") ) } {code} does that thing I want > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev updated FLINK-5592: - Comment: was deleted (was: Hi [~jark], thank you for helping me. I want exactly a row of a number of rows, this case falls even if {code} override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age")), new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("more_info", "and_so_on"))), Array("person", "additional") ) } {code}) > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834668#comment-15834668 ] ASF GitHub Bot commented on FLINK-4905: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I would like to pick this fix up. The exception has still occurred a few times for me in the past, and I prefer the above outlined solution, because it adds less locking on cancellation/shutdown, meaning there are fewer implications on deadlocks or long stalls on cancellation/shutdown. > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5078) Introduce annotations for classes copied from Calcite
[ https://issues.apache.org/jira/browse/FLINK-5078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-5078: --- Assignee: Timo Walther > Introduce annotations for classes copied from Calcite > - > > Key: FLINK-5078 > URL: https://issues.apache.org/jira/browse/FLINK-5078 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > We have already copied several classes from Calcite because of missing > features or bugs. In order to track those classes, update them when bumping > up the version, or check if they became obsolete it might be useful to > introduce a special annotation to mark those classes. Maybe with an > additional standardized comment format which lines have been modified. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I would like to pick this fix up. The exception has still occurred a few times for me in the past, and I prefer the above outlined solution, because it adds less locking on cancellation/shutdown, meaning there are fewer implications on deadlocks or long stalls on cancellation/shutdown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834660#comment-15834660 ] ASF GitHub Bot commented on FLINK-4905: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I think we can take a very simple approach here. Many other parts of the code follow the approach to tolerate exceptions thrown during cancellation, or during asynchronous calls on closed operators. - The client may be closed during shutting down of the Kafka08Fetcher, before even the surrounding KafkaConsumerBase knows that it is closing - The KafkaConsumerBase tries to commit offsets, sees the exception, and re-throws it since it assumes it. - We can make the Kafka08Fetcher catch exceptions when committing offsets, and only re-throwing them if the fetcher is still running. That should do the trick. BTW: There are plans to make the streaming API sources appear to be single threaded, to avoid that sources have to plan for such situations. > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Andrew Efimov > Labels: test-stability > Attachments: Kafka08Fetcher.png > > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3035: [ FLINK-4905] Kafka test instability IllegalStateExceptio...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I think we can take a very simple approach here. Many other parts of the code follow the approach to tolerate exceptions thrown during cancellation, or during asynchronous calls on closed operators. - The client may be closed during shutting down of the Kafka08Fetcher, before even the surrounding KafkaConsumerBase knows that it is closing - The KafkaConsumerBase tries to commit offsets, sees the exception, and re-throws it since it assumes it. - We can make the Kafka08Fetcher catch exceptions when committing offsets, and only re-throwing them if the fetcher is still running. That should do the trick. BTW: There are plans to make the streaming API sources appear to be single threaded, to avoid that sources have to plan for such situations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834655#comment-15834655 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97327675 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - - return metrics; } } - private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } - private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); - String name = deserializeString(dis); - return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); --- End diff -- Personally for short methods i think it's overkill. I would do it for methods like `deserializaHistogram` though. > MetricQueryService throws NullPointerException on JobManager >
[GitHub] flink pull request #3128: [FLINK-5464] Improve MetricDumpSerialization error...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97327675 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - - return metrics; } } - private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } - private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); - String name = deserializeString(dis); - return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); --- End diff -- Personally for short methods i think it's overkill. I would do it for methods like `deserializaHistogram` though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---