[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 PR #6392 fixed this issue. ---
[GitHub] flink pull request #6231: [FLINK-9694] Potentially NPE in CompositeTypeSeria...
Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/6231 ---
[GitHub] flink issue #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfigSnapsh...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6392 +1, I will close my PR #6231 about this issue ---
[GitHub] flink issue #6396: [FLINK-9806][docs] Add canonical link element to docs
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6396 hi @patricklucas thanks for your contribution, this PR contains two commits, is the "hotfix" commit related to FLINK-9806? if not, I suggest split the commit into a single PR. ---
[GitHub] flink issue #6401: [hotfix]fix typo for variable name dynamicProperties in F...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6401 hi @rileyli Thanks for your contribution, but I think just refactor the naming style is not very necessary and it is not "typo". cc @zentol @tillrohrmann ---
[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6367 cc @dawidwys and @pnowojski ---
[GitHub] flink issue #6397: [FLINK-9916] Add FROM_BASE64 function for table/sql API
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6397 cc @twalthr and @fhueske ---
[GitHub] flink pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6404 [FLINK-9928] Add LOG2 function for table/sql API ## What is the purpose of the change *This pull request adds LOG2 function for table/sql API* ## Brief change log - *Add LOG2 function for table/sql API* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testLog2*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9928 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6404 commit 1883e0cac63a4756523cc798a9fc150c0f2c298d Author: yanghua Date: 2018-07-24T11:13:39Z [FLINK-9928] Add LOG2 function for table/sql API ---
[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 @pnowojski thanks for giving a solution, I will try to verify it in our inner Flink version. ---
[GitHub] flink pull request #6397: [FLINK-9916] Add FROM_BASE64 function for table/sq...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6397 [FLINK-9916] Add FROM_BASE64 function for table/sql API ## What is the purpose of the change *This pull request adds FROM_BASE64 function for table/sql API* ## Brief change log - *Add FROM_BASE64 function for table/sql API* ## Verifying this change This change added tests and can be verified as follows: - *ScalarFunctionsTest#testFromBase64* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9916 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6397.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6397 commit 41d7f8ea77e80679ad0eb6fea08c38fe094c3514 Author: yanghua Date: 2018-07-23T15:38:23Z [FLINK-9916] Add FROM_BASE64 function for table/sql API ---
[GitHub] flink issue #6390: [FLINK-9915] Add TO_BASE64 function for table/sql API
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6390 cc @twalthr @hequn8128 @fhueske ---
[GitHub] flink issue #6381: [FLINK-7205] [table]Add UUID supported in SQL and Tab...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6381 +1 ---
[GitHub] flink pull request #6382: [FLINK-9907][Table API & SQL] add CRC32 support
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6382#discussion_r204369826 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -2063,6 +2063,22 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { expectedSha256) } + @Test + def testCrc32(): Unit = { +val expectedCrc32 = "3632233996" +testAllApis( + "test".crc32(), + "crc32('test')", + "CRC32('test')", + expectedCrc32) + +testAllApis( + 'f33.crc32(), + "crc32(f33)", + "CRC32(f33)", + "null") --- End diff -- I think add more test case, for example : ``` testAllApis( 'f33.crc32(), "f33.crc32()", "CRC32(f33)", "null") ``` looks better to me. ---
[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 @pnowojski I have said it is because of the constructor : ``` CompositeTypeSerializerConfigSnapshot(TypeSerializer... nestedSerializers) ``` used [`varargs ` in JIRA description](https://issues.apache.org/jira/browse/FLINK-9694), the last comment in this PR, I just explain it looks like this style. We added null check and it works fine in our Flink env. So if we do not process it, in this case, this code is useless: ``` Preconditions.checkNotNull(nestedSerializers); ``` Why we do not check null in the potential nullable context? So what's the way you think is not ugly and dangerous? ---
[GitHub] flink pull request #6390: [FLINK-9915] Add TO_BASE64 function for table/sql ...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6390 [FLINK-9915] Add TO_BASE64 function for table/sql API ## What is the purpose of the change *This pull request add TO_BASE64 function for table/sql API* ## Brief change log - *Add TO_BASE64 function for table/sql API* ## Verifying this change This change added tests and can be verified as follows: - *ScalarFunctionsTest#testToBase64* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9915 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6390.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6390 ---
[GitHub] flink issue #6353: [FLINK-9875][runtime] Add concurrent creation of executio...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6353 agree @StephanEwen , parallelize the core problem, this way we would not introduce potential concurrent problem of EG, EJV related logic. ---
[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6375#discussion_r203996832 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } } + isBucketReady(partitionPaths); } } + @Override + public boolean isBucketReady(Set bucketPathes) { + for (Path path : bucketPathes) { + try { + RemoteIterator files = fs.listFiles(path, false); + while (files.hasNext()) { + LocatedFileStatus fileStatus = files.next(); + if (fileStatus.getPath().getName().endsWith(pendingSuffix) || + fileStatus.getPath().getName().endsWith(inProgressSuffix)) { + return false; + } + } + return true; --- End diff -- I mean this return statement, can not verify all the bucket path is ready, right? because the loop is not finished. ---
[GitHub] flink issue #6362: [FLINK-9888][release] Remove unsafe defaults from release...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6362 +1 ---
[GitHub] flink issue #6374: [FLINK-9895][tests] Ensure error logging for NettyLeakDet...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6374 +1 ---
[GitHub] flink issue #6370: [FLINK-9894] [runtime] Potential Data Race
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6370 @tison1 I think PR #6353 and #6370 has causal relationship, the current codebase may not trigger this race condition, right? ---
[GitHub] flink pull request #6372: [Flink 9353] Tests running per job standalone clus...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6372#discussion_r203692186 --- Diff: flink-end-to-end-tests/README.md --- @@ -31,6 +31,12 @@ You can also run tests individually via $ FLINK_DIR= flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2 ``` +### Kubernetes test + +Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running minikube cluster. Right now we cannot +execute it on travis. You can run it thought with `run-single-test.sh` in your local environment as long --- End diff -- does the word "thought" need to be replaced with "through"? ---
[GitHub] flink issue #6373: [FLINK-9838][logging] Don't log slot request failures on ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6373 +1 ---
[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 hi @pnowojski I did not call the `CompositeTypeSerializerConfigSnapshot(TypeSerializer... nestedSerializers)` constructor explicitly, the caller is Flink itself, see [here](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala#L123). And I just fix the NPE in this case : ```scala def this() = this(null)//scala ``` but it does not means : ``` CompositeTypeSerializerConfigSnapshot(null);//java ``` it seems means : ``` CompositeTypeSerializerConfigSnapshot(new TypeSerializer[] {null}) //java ``` so it jumps the preconditions not null check : ``` Preconditions.checkNotNull(nestedSerializers);//java ``` then coursed NPE in the `for` loop [here](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java#L53). I think it is a defensive check, then it's OK in our inner Flink version (in the previous comment, I said we customized table to provide stream and dimension table join). ---
[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6365 seems `org.glassfish:javax.el:jar 3.0.1` has a non-SNAPSHOT version in maven repository, see here : https://mvnrepository.com/artifact/org.glassfish/javax.el/3.0.1-b10, Do you use a inner maven repository hosted in your company? If yes, maybe it forbid you to download. You can exclude this dependency in hbase and introduce a single dependency in your pom about it. ---
[GitHub] flink issue #6353: [FLINK-9875] Add concurrent creation of execution job ver...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6353 @tison1 there are too many commits, you can use `git rebase -i [commit-id]` to squash them, then use `git push -f xxx xxx` to force update the PR. ---
[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6367 @hequn8128 thanks, I have added some test case~ ---
[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6365#discussion_r203439890 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java --- @@ -87,22 +90,22 @@ public void configure(Configuration parameters) { @Override public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); + Connection connection = ConnectionFactory.createConnection(conf); --- End diff -- I think we should add try / catch block to protect the connect leak. ---
[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6365#discussion_r203440989 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java --- @@ -87,22 +90,22 @@ public void configure(Configuration parameters) { @Override public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); + Connection connection = ConnectionFactory.createConnection(conf); --- End diff -- Based on [HBase Connection JavaDoc](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html#close--) it seems the caller should invoke `close` method to release resource? so I suggest we should close connection in udf's `close` method. ---
[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6365#discussion_r203438579 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java --- @@ -81,7 +85,9 @@ private HTable createTable() { org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); try { - return new HTable(hConf, getTableName()); + Connection connection = ConnectionFactory.createConnection(hConf); + Table table = connection.getTable(TableName.valueOf(getTableName())); + return (HTable) table; --- End diff -- I think we should release the connection when happens exception ---
[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6365#discussion_r203439523 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java --- @@ -87,22 +90,22 @@ public void configure(Configuration parameters) { @Override public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); + Connection connection = ConnectionFactory.createConnection(conf); --- End diff -- I think we should add try / catch block to protect the connect leak. ---
[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6365#discussion_r203439859 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java --- @@ -87,22 +90,22 @@ public void configure(Configuration parameters) { @Override public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); + Connection connection = ConnectionFactory.createConnection(conf); --- End diff -- I think we should add try / catch block to protect the connect leak. ---
[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6367 @tillrohrmann and @zentol I see the Python DataStream API methods do not match DataStream Java API methods (missed some API methods), Shall we add those missed API into `PythonDataStream`? If yes, I'd like to do this. ---
[GitHub] flink pull request #6367: [FLINK-9850] Add a string to the print method to i...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6367 [FLINK-9850] Add a string to the print method to identify output for DataStream ## What is the purpose of the change *This pull request adds a string to the print method to identify output for DataStream* ## Brief change log - *add print(string) / printToErr(string) to DataStream Java API* - *add print(string) / printToErr(string) to DataStream Scala API* - *add print(string) to DataStream Python API* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9850 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6367 commit 80215cd12618392ab0909a431863939d3353ca16 Author: yanghua Date: 2018-07-18T15:20:11Z [FLINK-9850] Add a string to the print method to identify output for DataStream ---
[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6329 @zentol this PR match your requirement? I hope it can be merged into 1.6, so that users can see the full taskmanager log. ---
[GitHub] flink issue #6334: [FLINK-5232] Add a Thread default uncaught exception hand...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6334 @tillrohrmann and @zentol opinion? ---
[GitHub] flink issue #6358: [FLINK-9882] [runtime] A function access can be private
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6358 +1, ---
[GitHub] flink pull request #6359: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6359#discussion_r203265891 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} +import org.apache.flink.table.runtime.functions.DateTimeFunctions + +class StrToDateCallGen extends CallGenerator { --- End diff -- if provide class doc, it would be better ---
[GitHub] flink issue #6353: [FLINK-9875] Add concurrent creation of execution job ver...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6353 you are right, this PR used to improve the performance when creating ejv, I did not make sure the exist test cases for `attachJobGraph ` covered the exception test. if not, I suggest add some exception test, because this PR changed the way of processing exception. ---
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6297 @dawidwys added test case, please review~ ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6347 +1 from myside, it depends on @zentol or @tillrohrmann 's opinion. ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6347 it seems Flink also uses "vertices", I agree your opinion. ---
[GitHub] flink issue #6347: [hotfix] typo: vertexes -> vertices
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6347 my English is poor, but it seems "vertexes" is one of the vertex pluralities? cc @zentol @tison1 ---
[GitHub] flink pull request #6344: [FLINK-9866] Allow passing command line arguments ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6344#discussion_r202895544 --- Diff: flink-container/kubernetes/README.md --- @@ -17,6 +17,7 @@ The files contain the following variables: - `${FLINK_IMAGE_NAME}`: Name of the image to use for the container - `${FLINK_JOB}`: Name of the Flink job to start (the user code jar must be included in the container image) +- `${FLINK_JOB_ARGUMENS}`: Job specific command line arguments --- End diff -- Shall we give a example or more documentation to guide how to pass the command line arguments? for example the format, like "--arg val" or something else? because here are many formats such as "--key value", "-Dxxx=xx". ---
[GitHub] flink issue #6266: [FLINK-9682] Add setDescription to execution environment ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6266 @zentol can you review this PR? so that I can start the part 2 of the task as soon as possible. thanks. ---
[GitHub] flink issue #6337: [FLINK-9853][Tabel API & SQL] add HEX support
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6337 +1, from my side ---
[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6336 @ubyyj you can use this command to trigger Travis rebuild : ``` git commit --allow-empty -m "" ``` ---
[GitHub] flink pull request #6337: [FLINK-9853][Tabel API & SQL] add HEX support
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6337#discussion_r202573057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -182,4 +184,6 @@ object ScalarFunctions { new String(data) } + + def hex(x: String): String = Hex.encodeHexString(x.getBytes) --- End diff -- add the doc for the API looks better to me ---
[GitHub] flink pull request #6337: [FLINK-9853][Tabel API & SQL] add HEX support
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6337#discussion_r202572746 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -400,6 +400,12 @@ trait ImplicitExpressionOperations { * numeric is null. E.g. "4" leads to "100", "12" leads to "1100". */ def bin() = Bin(expr) + /** --- End diff -- please insert a new blank line ---
[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6329 @dawidwys can you review this PR? ---
[GitHub] flink issue #6334: [FLINK-5232] Add a Thread default uncaught exception hand...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6334 hi @tillrohrmann I tried to fix this issue based on your suggestion in the jira. But there seems a little question, I want to consult you. The question is about the ActorSystem, you suggested add the uncaught exception handler for the `ActorSystem`. To do this, we should extend the `ActorSystemImpl` (the default implementation). This class's constructor has [many parameters](https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L651). I am not very familiar with it. So I tried fill the ["default" params](https://github.com/yanghua/flink/blob/27dec5d60d2e799aeea66013b3da904cec137408/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RobustActorSystem.scala#L33). I ran the test case, they always failed because of the fifth parameter. So the question is the `ActorSystemImpl` is marked as `InternalApi `, it may be changed in the future, shall we extend a actor system based with it? If yes, what's the correct value for these parameter? I saw some similar customized case, such as [this](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d#file-exceptioncatchingactorsystemimpl-scala-L14) and [this](https://gist.github.com/Kayrnt/9082178#file-rebootactorsystem-scala-L28). However, it seems their version are both lower. So hope for your idea and suggestion. ---
[GitHub] flink pull request #6334: [FLINK-5232] Add a Thread default uncaught excepti...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6334 [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager ## What is the purpose of the change *This pull request Add a Thread default uncaught exception handler on the JobManager* ## Brief change log - *Add a Thread default uncaught exception handler on the JobManager* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-5232 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6334.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6334 commit 27dec5d60d2e799aeea66013b3da904cec137408 Author: yanghua Date: 2018-07-14T11:05:20Z [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); - c
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; --- End diff -- mark these fields as `private` as provide `getter/setter` looks better to me ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505334 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration +* @return The SSLContext object which can be used by the ssl transport server +* Returns null if SSL is disabled +* @throws Exception +* Thrown if there is any misconfiguration +*/ + @Nullable + public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + --- End diff -- this empty line is useless, can be removed ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { --- End diff -- provide a constructor like `SSLProvider(String provider)` to give the enum's string representation looks better than hard code. ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { --- End diff -- the class name use singular looks better to me ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); - c
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration --- End diff -- the description of the param and throws do not need linefeed ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505321 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java --- @@ -160,6 +160,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); +// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL"); --- End diff -- if this is a useless dead code, can be removed ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202381490 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -500,11 +501,11 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma } if (commandLine.hasOption(jmMemory.getOpt())) { - effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt())); + effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()) + "m"); --- End diff -- you are right, I have update the PR, please review, thanks. ---
[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6329 @zentol yes, you are right, sorry about my expression. here we should not use try-with-resource, because the listener will close the file. And it seems try-with-resource close operation more faster than the complete listener. ---
[GitHub] flink pull request #6329: [FLINK-9841] Web UI only show partial taskmanager ...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6329 [FLINK-9841] Web UI only show partial taskmanager log ## What is the purpose of the change *This pull request fixed a bug triggered web UI only show partial taskmanager log* ## Brief change log - *Remove the redundant resource close* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9841 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6329 commit e69efdef7546bd88c5a73d303e689ea5d051b931 Author: yanghua Date: 2018-07-13T08:48:04Z [FLINK-9841] Web UI only show partial taskmanager log ---
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/5954 ---
[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5954 OK, closing this PR... ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202075216 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- introduce a new config key is used to make the `jobmanager.heap.mb` backwards compatibility in flink config file (config.sh can calculate it accurately). And user can specify the unit for the value of the key `jobmanager.heap.size` . So if we remove anything about `JOB_MANAGER_HEAP_MEMORY_MB ` in Java and Scala code, is there any problem? ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202071052 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- to @GJL we can not use `.withDeprecatedKeys("jobmanager.heap.mb")` because `jobmanager.heap.size` and `jobmanager.heap.mb` has different meaning. The former can use different unit such **1g** but the latter can just measure with **MB**. to @dawidwys and @GJL , now the `jobmanager.heap.mb` just used in config file, and can be calculated accurately, this is used for backwards compatibility, but in the project, it is useless, all the place can be replaced with `jobmanager.heap.size`, and the key in the code could not been exposed to the user? ---
[GitHub] flink issue #6307: [FLINK-9805][rest] Catch JsonProcessingException in RestC...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6307 +1 ---
[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202013852 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) + jobManagerMetricGroup.removeJob(jobID) +} + +case scala.util.Failure(_) => + + }(context.dispatcher) + + case None => None +} -jobManagerMetricGroup.removeJob(jobID) --- End diff -- this line can also be removed ---
[GitHub] flink issue #6266: [FLINK-9682] Add setDescription to execution environment ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6266 hi @tillrohrmann I have split the original issue into two issues, this PR for the first backend issue, the second issue will depend on this PR, please review~ ---
[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5954 @tillrohrmann That's all right. I know you are very busy. Just a little question, I have reviewed PR(#6202). I saw it used `Exception`, and there is a suggestion from Stephan in May 7 : ``` I would use Throwable in the signatures. It may always be that some Error is the cause (class not found, etc.) ``` So I replaced the `Exception ` to `Throwable` in this PR, do you think it can be consider? If not, I would close this PR. ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r201997596 --- Diff: docs/ops/deployment/yarn_setup.md --- @@ -101,12 +101,12 @@ Usage: Optional -D Dynamic properties -d,--detached Start detached - -jm,--jobManagerMemory Memory for JobManager Container [in MB] + -jm,--jobManagerMemory Memory for JobManager Container [with unit, if not, use MB] --- End diff -- change this soon ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r201997464 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -386,10 +386,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat } // JobManager Memory - final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes(); + final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes(); --- End diff -- in FLINK-6469, in order to config the jm's memory with unit I introduced a new key and deprecated `jobmanager.heap.mb`. * in flink codebase(except shell script) I have removed all the place used `JOB_MANAGER_HEAP_MEMORY_MB` and `jobmanager.heap.mb`, so it will not be used. * in shell (`config.sh`) the old key `jobmanager.heap.mb` also be supported if the new key `jobmanager.heap.size` can not be read, so it still be supported. ---
[GitHub] flink issue #6129: [FLINK-9503] Migrate integration tests for iterative aggr...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6129 @tillrohrmann this PR could be merged ---
[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6305#discussion_r201907959 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java --- @@ -871,19 +892,40 @@ public IntType(int value) { } } - protected int numElementsPerKey() { - return 300; + private int numElementsPerKey() { + switch (this.stateBackendEnum) { + case ROCKSDB_FULLY_ASYNC: + case ROCKSDB_INCREMENTAL: + case ROCKSDB_INCREMENTAL_ZK: + return 3000; + default: + return 300; --- End diff -- seems missed a "tab" here ---
[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6305#discussion_r201908022 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java --- @@ -871,19 +892,40 @@ public IntType(int value) { } } - protected int numElementsPerKey() { - return 300; + private int numElementsPerKey() { + switch (this.stateBackendEnum) { + case ROCKSDB_FULLY_ASYNC: + case ROCKSDB_INCREMENTAL: + case ROCKSDB_INCREMENTAL_ZK: + return 3000; --- End diff -- change this to a const looks better~ ---
[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6305#discussion_r201908226 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java --- @@ -25,35 +25,46 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; -import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum; +import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum; +import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; +import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; +import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; /** - * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured + * This test delegates to instances of {@link EventTimeWindowCheckpointingITCase} that have been reconfigured * to use local recovery. * - * TODO: This class must be refactored to properly extend {@link AbstractEventTimeWindowCheckpointingITCase}. + * TODO: This class must be refactored to properly extend {@link EventTimeWindowCheckpointingITCase}. --- End diff -- is the TODO still needed? ---
[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6305#discussion_r201908082 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java --- @@ -871,19 +892,40 @@ public IntType(int value) { } } - protected int numElementsPerKey() { - return 300; + private int numElementsPerKey() { + switch (this.stateBackendEnum) { + case ROCKSDB_FULLY_ASYNC: + case ROCKSDB_INCREMENTAL: + case ROCKSDB_INCREMENTAL_ZK: + return 3000; + default: + return 300; + } } - protected int windowSize() { - return 100; + private int windowSize() { + switch (this.stateBackendEnum) { + case ROCKSDB_FULLY_ASYNC: + case ROCKSDB_INCREMENTAL: + case ROCKSDB_INCREMENTAL_ZK: + return 1000; --- End diff -- change this to a const looks better to me ---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6301 @sihuazhou is right and reviewed, +1 from my side ---
[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 @zentol we really caused this exception, in our inner Flink version, we customized flink-table and implemented stream and dim table join. I think the default constructor is needed by deserialization. Whatever it takes, the author who wrote this code misunderstood the variable `nestedSerializers ` could be null(in current case), but it did not happens. The truth is : the elements in `nestedSerializers` could be null. We add a judgement and fixed this NPE, now it works OK. ---
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6297 @zentol can you review this? ---
[GitHub] flink issue #6298: [FLINK-9784] Fix inconsistent use of 'static' in AsyncIOE...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6298 +1, from my side ---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6301#discussion_r201558827 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -41,6 +43,8 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000; + static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0; --- End diff -- OK, my wrong, I misunderstand that you make this value as Timer#schedule method's third parameter period ---
[GitHub] flink issue #6296: flink on yarn ,Duplicate upload file flink-dist*.jar
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6296 @linjun007 please rename the PR title ---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6301#discussion_r201552317 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -41,6 +43,8 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000; + static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0; --- End diff -- I'd like to change the const value to larger than "0", based on JDK 1.8, the `Timer#schedule` method's third parameter `period` less or equal than "0" will throw `IllegalArgumentException` exception, see here : https://docs.oracle.com/javase/8/docs/api/java/util/Timer.html#schedule-java.util.TimerTask-long-long- ---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6301#discussion_r201551945 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -54,6 +58,10 @@ private Connection dbConn; private PreparedStatement upload; + private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; + private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; --- End diff -- change the variable to `idleConnectionCheckTimeout ` looks better to me ---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6301#discussion_r201551369 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java --- @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { return this; } + public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) { + this.idleConnectionCheckInterval = idleConnectionCheckInterval; + return this; + } + + public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) { --- End diff -- please add java doc ---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6301#discussion_r201551339 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java --- @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { return this; } + public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) { --- End diff -- please add java doc ---
[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6297 @GJL please review~ ---
[GitHub] flink issue #6296: 解决flink-dist*.jar多次上�
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6296 @linjun007 thanks for your contribution, please rename the PR's title based on this description : ``` Name the pull request in the form "[FLINK-] [component] Title of the pull request", where FLINK- should be replaced by the actual issue number. Skip component if you are unsure about which is the best component. ``` ---
[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6297 [FLINK-9777] YARN: JM and TM Memory must be specified with Units ## What is the purpose of the change *This pull request specify unit for JM and TM memory on YARN mode* ## Brief change log - *parse the jm and tm with default MB unit* - *change related document* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9777 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6297.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6297 commit 0ba28e996e5dc01040b9dd4cc9d3d86f6cb9dacd Author: yanghua Date: 2018-07-10T15:16:12Z [FLINK-9777] YARN: JM and TM Memory must be specified with Units ---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6292#discussion_r201313023 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; + +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link TaskMetricGroup} that exposes all registered metrics. + */ +public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup { + + private Map intercepted; + + /** +* Returns the registered metric for the given name, or null if it was never registered. +* +* @param name metric name +* @return registered metric for the given name, or null if it was never registered +*/ + public Metric get(String name) { + return intercepted.get(name); --- End diff -- if this method is invoked before `addMetric` , then the `intercepted` would not be initialized, seems it can trigger NPE ---
[GitHub] flink issue #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') wrongly...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6284 find related test error, please recheck~ ---
[GitHub] flink issue #6285: [FLINK-9768][release] Speed up binary release
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6285 +1 ---
[GitHub] flink issue #6286: [FLINK-9754][release] Remove references to scala profiles
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6286 +1 ---
[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6231 @pnowojski can you review this? ---
[GitHub] flink issue #6290: [Flink-9691] [Kinesis Connector] Attempt to call getRecor...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6290 +1, from my side~ ---
[GitHub] flink pull request #6291: [FLINK-9785][network] add remote address informati...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6291#discussion_r201303678 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java --- @@ -167,7 +167,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. " + "This indicates that the remote task manager was lost.", remoteAddr, cause); } else { - tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause); + final SocketAddress localAddr = ctx.channel().localAddress(); + tex = new LocalTransportException(cause.getMessage() + " (connection to '" + remoteAddr + "')", localAddr, cause); --- End diff -- using `String.format` here looks better to me ---
[GitHub] flink pull request #6291: [FLINK-9785][network] add remote address informati...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6291#discussion_r201304267 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -164,7 +164,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E + "that the remote task manager was lost.", remoteAddr, cause); } else { - tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause); + SocketAddress localAddr = ctx.channel().localAddress(); + tex = new LocalTransportException(cause.getMessage() + " (connection to '" + remoteAddr + "')", --- End diff -- the same ---
[GitHub] flink pull request #6277: [FLINK-9511] Implement TTL config
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6277#discussion_r201264096 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java --- @@ -93,4 +97,82 @@ public Time getTtl() { public TtlTimeCharacteristic getTimeCharacteristic() { return timeCharacteristic; } + + @Override + public String toString() { + return "StateTtlConfiguration{" + + "ttlUpdateType=" + ttlUpdateType + + ", stateVisibility=" + stateVisibility + + ", timeCharacteristic=" + timeCharacteristic + + ", ttl=" + ttl + + '}'; + } + + public static Builder newBuilder(Time ttl) { + return new Builder(ttl); + } + + /** +* Builder for the {@link StateTtlConfiguration}. +*/ + public static class Builder { + + private TtlUpdateType ttlUpdateType = OnCreateAndWrite; + private TtlStateVisibility stateVisibility = NeverReturnExpired; + private TtlTimeCharacteristic timeCharacteristic = ProcessingTime; + private Time ttl; + + public Builder(Time ttl) { --- End diff -- @azagrebin I removed the default value for TimeCharacteristic, it will depend on users' choice. Do you agree this mode? ---
[GitHub] flink pull request #6277: [FLINK-9511] Implement TTL config
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6277#discussion_r201229057 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java --- @@ -93,4 +97,82 @@ public Time getTtl() { public TtlTimeCharacteristic getTimeCharacteristic() { return timeCharacteristic; } + + @Override + public String toString() { + return "StateTtlConfiguration{" + + "ttlUpdateType=" + ttlUpdateType + + ", stateVisibility=" + stateVisibility + + ", timeCharacteristic=" + timeCharacteristic + + ", ttl=" + ttl + + '}'; + } + + public static Builder newBuilder(Time ttl) { + return new Builder(ttl); + } + + /** +* Builder for the {@link StateTtlConfiguration}. +*/ + public static class Builder { + + private TtlUpdateType ttlUpdateType = OnCreateAndWrite; + private TtlStateVisibility stateVisibility = NeverReturnExpired; + private TtlTimeCharacteristic timeCharacteristic = ProcessingTime; + private Time ttl; + + public Builder(Time ttl) { --- End diff -- ok, hold on. ---