[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-25 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
PR #6392 fixed this issue.


---


[GitHub] flink pull request #6231: [FLINK-9694] Potentially NPE in CompositeTypeSeria...

2018-07-25 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/6231


---


[GitHub] flink issue #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfigSnapsh...

2018-07-25 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6392
  
+1, I will close my PR #6231 about this issue


---


[GitHub] flink issue #6396: [FLINK-9806][docs] Add canonical link element to docs

2018-07-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6396
  
hi @patricklucas thanks for your contribution, this PR contains two 
commits, is the "hotfix" commit related to FLINK-9806? if not, I suggest split 
the commit into a single PR.


---


[GitHub] flink issue #6401: [hotfix]fix typo for variable name dynamicProperties in F...

2018-07-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6401
  
hi @rileyli Thanks for your contribution, but I think just refactor the 
naming style is not very necessary and it is not "typo". cc @zentol 
@tillrohrmann 


---


[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...

2018-07-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6367
  
cc @dawidwys and @pnowojski 


---


[GitHub] flink issue #6397: [FLINK-9916] Add FROM_BASE64 function for table/sql API

2018-07-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6397
  
cc @twalthr and @fhueske 


---


[GitHub] flink pull request #6404: [FLINK-9928] Add LOG2 function for table/sql API

2018-07-24 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6404

[FLINK-9928] Add LOG2 function for table/sql API

## What is the purpose of the change

*This pull request adds LOG2 function for table/sql API*

## Brief change log

  - *Add LOG2 function for table/sql API*

## Verifying this change

This change is already covered by existing tests, such as 
*ScalarFunctionsTest#testLog2*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9928

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6404


commit 1883e0cac63a4756523cc798a9fc150c0f2c298d
Author: yanghua 
Date:   2018-07-24T11:13:39Z

[FLINK-9928] Add LOG2 function for table/sql API




---


[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
@pnowojski thanks for giving a solution, I will try to verify it in our 
inner Flink version.


---


[GitHub] flink pull request #6397: [FLINK-9916] Add FROM_BASE64 function for table/sq...

2018-07-23 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6397

[FLINK-9916] Add FROM_BASE64 function for table/sql API

## What is the purpose of the change

*This pull request adds FROM_BASE64 function for table/sql API*

## Brief change log

  - *Add FROM_BASE64 function for table/sql API*

## Verifying this change

This change added tests and can be verified as follows:

  - *ScalarFunctionsTest#testFromBase64*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9916

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6397.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6397


commit 41d7f8ea77e80679ad0eb6fea08c38fe094c3514
Author: yanghua 
Date:   2018-07-23T15:38:23Z

[FLINK-9916] Add FROM_BASE64 function for table/sql API




---


[GitHub] flink issue #6390: [FLINK-9915] Add TO_BASE64 function for table/sql API

2018-07-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6390
  
cc @twalthr @hequn8128 @fhueske 


---


[GitHub] flink issue #6381: [FLINK-7205] [table]Add UUID supported in SQL and Tab...

2018-07-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6381
  
+1


---


[GitHub] flink pull request #6382: [FLINK-9907][Table API & SQL] add CRC32 support

2018-07-23 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6382#discussion_r204369826
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -2063,6 +2063,22 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   expectedSha256)
   }
 
+  @Test
+  def testCrc32(): Unit = {
+val expectedCrc32 = "3632233996"
+testAllApis(
+  "test".crc32(),
+  "crc32('test')",
+  "CRC32('test')",
+  expectedCrc32)
+
+testAllApis(
+  'f33.crc32(),
+  "crc32(f33)",
+  "CRC32(f33)",
+  "null")
--- End diff --

I think add more test case, for example : 

```
testAllApis(
  'f33.crc32(),
  "f33.crc32()",
  "CRC32(f33)",
  "null")
```

looks better to me.


---


[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
@pnowojski I have said it is because of the constructor : 

```
CompositeTypeSerializerConfigSnapshot(TypeSerializer... 
nestedSerializers)
```

used [`varargs ` in JIRA 
description](https://issues.apache.org/jira/browse/FLINK-9694), the last 
comment in this PR, I just explain it looks like this style.

We added null check and it works fine in our Flink env. So if we do not 
process it, in this case, this code is useless: 

```
Preconditions.checkNotNull(nestedSerializers);
```
Why we do not check null in the potential nullable context? So what's the 
way you think is not ugly and dangerous? 




---


[GitHub] flink pull request #6390: [FLINK-9915] Add TO_BASE64 function for table/sql ...

2018-07-23 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6390

[FLINK-9915] Add TO_BASE64 function for table/sql API

## What is the purpose of the change

*This pull request add TO_BASE64 function for table/sql API*

## Brief change log

  - *Add TO_BASE64 function for table/sql API*

## Verifying this change

This change added tests and can be verified as follows:

  - *ScalarFunctionsTest#testToBase64*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9915

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6390.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6390






---


[GitHub] flink issue #6353: [FLINK-9875][runtime] Add concurrent creation of executio...

2018-07-22 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6353
  
agree @StephanEwen , parallelize the core problem, this way we would not 
introduce potential concurrent problem of EG, EJV related logic.


---


[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

2018-07-20 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6375#discussion_r203996832
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long 
checkpointId) throws Exception {
}
}
}
+   isBucketReady(partitionPaths);
}
}
 
+   @Override
+   public boolean isBucketReady(Set bucketPathes) {
+   for (Path path : bucketPathes) {
+   try {
+   RemoteIterator files = 
fs.listFiles(path, false);
+   while (files.hasNext()) {
+   LocatedFileStatus fileStatus = 
files.next();
+   if 
(fileStatus.getPath().getName().endsWith(pendingSuffix) ||
+   
fileStatus.getPath().getName().endsWith(inProgressSuffix)) {
+   return false;
+   }
+   }
+   return true;
--- End diff --

I mean this return statement, can not verify all the bucket path is ready, 
right? because the loop is not finished.


---


[GitHub] flink issue #6362: [FLINK-9888][release] Remove unsafe defaults from release...

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6362
  
+1


---


[GitHub] flink issue #6374: [FLINK-9895][tests] Ensure error logging for NettyLeakDet...

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6374
  
+1


---


[GitHub] flink issue #6370: [FLINK-9894] [runtime] Potential Data Race

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6370
  
@tison1 I think PR #6353 and #6370 has causal relationship, the current 
codebase may not trigger this race condition, right?


---


[GitHub] flink pull request #6372: [Flink 9353] Tests running per job standalone clus...

2018-07-19 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6372#discussion_r203692186
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -31,6 +31,12 @@ You can also run tests individually via
 $ FLINK_DIR= flink-end-to-end-tests/run-single-test.sh 
your_test.sh arg1 arg2
 ```
 
+### Kubernetes test
+
+Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running 
minikube cluster. Right now we cannot
+execute it on travis. You can run it thought with `run-single-test.sh` in 
your local environment as long 
--- End diff --

does the word "thought" need to be replaced with "through"?


---


[GitHub] flink issue #6373: [FLINK-9838][logging] Don't log slot request failures on ...

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6373
  
+1


---


[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
hi @pnowojski I did not call the 
`CompositeTypeSerializerConfigSnapshot(TypeSerializer... nestedSerializers)` 
constructor explicitly, the caller is Flink itself, see 
[here](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala#L123).
 And I just fix the NPE in this case : 

```scala
def this() = this(null)//scala
```
but it does not means : 

```
CompositeTypeSerializerConfigSnapshot(null);//java
```

it seems means : 

```
CompositeTypeSerializerConfigSnapshot(new TypeSerializer[] {null})
//java
```

so it jumps the preconditions not null check : 

```
Preconditions.checkNotNull(nestedSerializers);//java
```
then coursed NPE in the `for` loop 
[here](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java#L53).

I think it is a defensive check, then it's OK in our inner Flink version 
(in the previous comment, I said we customized table to provide stream and 
dimension table join).



---


[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6365
  
seems `org.glassfish:javax.el:jar 3.0.1` has a non-SNAPSHOT version in 
maven repository, see here :  
https://mvnrepository.com/artifact/org.glassfish/javax.el/3.0.1-b10, Do you use 
a inner maven repository hosted in your company? If yes, maybe it forbid you to 
download. You can exclude this dependency in hbase and introduce a single 
dependency in your pom about it.


---


[GitHub] flink issue #6353: [FLINK-9875] Add concurrent creation of execution job ver...

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6353
  
@tison1 there are too many commits, you can use `git rebase -i [commit-id]` 
to squash them, then use `git push -f xxx xxx` to force update the PR.


---


[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...

2018-07-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6367
  
@hequn8128  thanks, I have added some test case~


---


[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-18 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6365#discussion_r203439890
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 ---
@@ -87,22 +90,22 @@ public void configure(Configuration parameters) {
 
@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
-   table = new HTable(conf, "flinkExample");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
--- End diff --

I think we should add try / catch block to protect the connect leak.


---


[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-18 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6365#discussion_r203440989
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 ---
@@ -87,22 +90,22 @@ public void configure(Configuration parameters) {
 
@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
-   table = new HTable(conf, "flinkExample");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
--- End diff --

Based on [HBase Connection 
JavaDoc](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html#close--)
 it seems the caller should invoke `close` method to release resource? so I 
suggest we should close connection in udf's `close` method.


---


[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-18 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6365#discussion_r203438579
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -81,7 +85,9 @@ private HTable createTable() {
org.apache.hadoop.conf.Configuration hConf = 
HBaseConfiguration.create();
 
try {
-   return new HTable(hConf, getTableName());
+   Connection connection = 
ConnectionFactory.createConnection(hConf);
+   Table table = 
connection.getTable(TableName.valueOf(getTableName()));
+   return (HTable) table;
--- End diff --

I think we should release the connection when happens exception


---


[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-18 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6365#discussion_r203439523
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 ---
@@ -87,22 +90,22 @@ public void configure(Configuration parameters) {
 
@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
-   table = new HTable(conf, "flinkExample");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
--- End diff --

I think we should add try / catch block to protect the connect leak.


---


[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-18 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6365#discussion_r203439859
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 ---
@@ -87,22 +90,22 @@ public void configure(Configuration parameters) {
 
@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
-   table = new HTable(conf, "flinkExample");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
--- End diff --

I think we should add try / catch block to protect the connect leak.


---


[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...

2018-07-18 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6367
  
@tillrohrmann and @zentol I see the Python DataStream API methods do not 
match DataStream Java API methods (missed some API methods), Shall we add those 
missed API into `PythonDataStream`? If yes, I'd like to do this.


---


[GitHub] flink pull request #6367: [FLINK-9850] Add a string to the print method to i...

2018-07-18 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6367

[FLINK-9850] Add a string to the print method to identify output for 
DataStream

## What is the purpose of the change

*This pull request adds a string to the print method to identify output for 
DataStream*


## Brief change log

  - *add print(string) / printToErr(string) to DataStream Java API*
  - *add print(string) / printToErr(string) to DataStream Scala API*
  - *add print(string) to DataStream Python API*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9850

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6367


commit 80215cd12618392ab0909a431863939d3353ca16
Author: yanghua 
Date:   2018-07-18T15:20:11Z

[FLINK-9850] Add a string to the print method to identify output for 
DataStream




---


[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log

2018-07-18 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6329
  
@zentol this PR match your requirement? I hope it can be merged into 1.6, 
so that users can see the full taskmanager log. 


---


[GitHub] flink issue #6334: [FLINK-5232] Add a Thread default uncaught exception hand...

2018-07-18 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6334
  
@tillrohrmann and @zentol  opinion?


---


[GitHub] flink issue #6358: [FLINK-9882] [runtime] A function access can be private

2018-07-18 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6358
  
+1,


---


[GitHub] flink pull request #6359: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-07-18 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6359#discussion_r203265891
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.DateTimeFunctions
+
+class StrToDateCallGen extends CallGenerator {
--- End diff --

if provide class doc, it would be better


---


[GitHub] flink issue #6353: [FLINK-9875] Add concurrent creation of execution job ver...

2018-07-17 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6353
  
you are right, this PR used to improve the performance when creating ejv, I 
did not make sure the exist test cases for `attachJobGraph ` covered the 
exception test. if not, I suggest add some exception test, because this PR 
changed the way of processing exception.


---


[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-17 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6297
  
@dawidwys added test case, please review~


---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6347
  
+1 from myside, it depends on @zentol or @tillrohrmann 's opinion.


---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6347
  
it seems Flink also uses "vertices", I agree your opinion. 


---


[GitHub] flink issue #6347: [hotfix] typo: vertexes -> vertices

2018-07-17 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6347
  
my English is poor, but it seems "vertexes" is one of the vertex 
pluralities? cc @zentol @tison1 


---


[GitHub] flink pull request #6344: [FLINK-9866] Allow passing command line arguments ...

2018-07-16 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6344#discussion_r202895544
  
--- Diff: flink-container/kubernetes/README.md ---
@@ -17,6 +17,7 @@ The files contain the following variables:
 
 - `${FLINK_IMAGE_NAME}`: Name of the image to use for the container
 - `${FLINK_JOB}`: Name of the Flink job to start (the user code jar must 
be included in the container image)
+- `${FLINK_JOB_ARGUMENS}`: Job specific command line arguments
--- End diff --

Shall we give a example or more documentation to guide how to pass the 
command line arguments? for example the format, like "--arg val" or something 
else? because here are many formats such as "--key value", "-Dxxx=xx".


---


[GitHub] flink issue #6266: [FLINK-9682] Add setDescription to execution environment ...

2018-07-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6266
  
@zentol can you review this PR? so that I can start the part 2 of the task 
as soon as possible. thanks.


---


[GitHub] flink issue #6337: [FLINK-9853][Tabel API & SQL] add HEX support

2018-07-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6337
  
+1, from my side


---


[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...

2018-07-15 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6336
  
@ubyyj you can use this command to trigger Travis rebuild : 

```
git commit --allow-empty -m ""
```


---


[GitHub] flink pull request #6337: [FLINK-9853][Tabel API & SQL] add HEX support

2018-07-15 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6337#discussion_r202573057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -182,4 +184,6 @@ object ScalarFunctions {
 
 new String(data)
   }
+
+  def hex(x: String): String = Hex.encodeHexString(x.getBytes)
--- End diff --

add the doc for the API looks better to me


---


[GitHub] flink pull request #6337: [FLINK-9853][Tabel API & SQL] add HEX support

2018-07-15 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6337#discussion_r202572746
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -400,6 +400,12 @@ trait ImplicitExpressionOperations {
 * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
 */
   def bin() = Bin(expr)
+  /**
--- End diff --

please insert a new blank line


---


[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log

2018-07-15 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6329
  
@dawidwys  can you review this PR?


---


[GitHub] flink issue #6334: [FLINK-5232] Add a Thread default uncaught exception hand...

2018-07-14 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6334
  
hi @tillrohrmann I tried to fix this issue based on your suggestion in the 
jira. But there seems a little question, I want to consult you.

The question is about the ActorSystem, you suggested add the uncaught 
exception handler for the `ActorSystem`. To do this, we should extend the 
`ActorSystemImpl` (the default implementation). This class's constructor has 
[many 
parameters](https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L651).
 I am not very familiar with it. So I tried fill the ["default" 
params](https://github.com/yanghua/flink/blob/27dec5d60d2e799aeea66013b3da904cec137408/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RobustActorSystem.scala#L33).
 I ran the test case, they always failed because of the fifth parameter. 

So the question is the `ActorSystemImpl` is marked as `InternalApi `, it 
may be changed in the future, shall we extend a actor system based with it? If 
yes, what's the correct value for these parameter?

I saw some similar customized case, such as 
[this](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d#file-exceptioncatchingactorsystemimpl-scala-L14)
 and 
[this](https://gist.github.com/Kayrnt/9082178#file-rebootactorsystem-scala-L28).
 However, it seems their version are both lower. 

So hope for your idea and suggestion.


---


[GitHub] flink pull request #6334: [FLINK-5232] Add a Thread default uncaught excepti...

2018-07-14 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6334

[FLINK-5232] Add a Thread default uncaught exception handler on the 
JobManager

## What is the purpose of the change

*This pull request Add a Thread default uncaught exception handler on the 
JobManager*


## Brief change log

  - *Add a Thread default uncaught exception handler on the JobManager*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-5232

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6334


commit 27dec5d60d2e799aeea66013b3da904cec137408
Author: yanghua 
Date:   2018-07-14T11:05:20Z

[FLINK-5232] Add a Thread default uncaught exception handler on the 
JobManager




---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
 
-   c

[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
--- End diff --

mark these fields as `private` as provide `getter/setter` looks better to me


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505334
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
+* @return The SSLContext object which can be used by the ssl transport 
server
+* Returns null if SSL is disabled
+* @throws Exception
+* Thrown if there is any misconfiguration
+*/
+   @Nullable
+   public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
+
--- End diff --

this empty line is useless, can be removed


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
--- End diff --

provide a constructor like `SSLProvider(String provider)` to give the 
enum's string representation looks better than hard code.


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
--- End diff --

the class name use singular looks better to me


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
 
-   c

[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
--- End diff --

the description of the param and  throws do not need linefeed


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505321
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 ---
@@ -160,6 +160,7 @@ private Configuration createSslConfig() throws 
Exception {
flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"password");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
+// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL");
--- End diff --

if this is a useless dead code, can be removed


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202381490
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -500,11 +501,11 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
}
 
if (commandLine.hasOption(jmMemory.getOpt())) {
-   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()));
+   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()) + "m");
--- End diff --

you are right, I have update the PR, please review, thanks.


---


[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log

2018-07-13 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6329
  
@zentol yes, you are right, sorry about my expression.  here we should not 
use try-with-resource, because the listener will close the file. And it seems 
try-with-resource close operation more faster than the complete listener.


---


[GitHub] flink pull request #6329: [FLINK-9841] Web UI only show partial taskmanager ...

2018-07-13 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6329

[FLINK-9841] Web UI only show partial taskmanager log

## What is the purpose of the change

*This pull request fixed a bug triggered web UI only show partial 
taskmanager log*

## Brief change log

  - *Remove the redundant resource close*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9841

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6329


commit e69efdef7546bd88c5a73d303e689ea5d051b931
Author: yanghua 
Date:   2018-07-13T08:48:04Z

[FLINK-9841] Web UI only show partial taskmanager log




---


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-07-13 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5954


---


[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails

2018-07-13 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5954
  
OK, closing this PR...


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202075216
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

introduce a new config key is used to make the `jobmanager.heap.mb` 
backwards compatibility in flink config file (config.sh can calculate it 
accurately). And user can specify the unit for the value of the key 
`jobmanager.heap.size` .

So if we remove anything about `JOB_MANAGER_HEAP_MEMORY_MB ` in Java and 
Scala code, is there any problem?


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202071052
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

to @GJL we can not use `.withDeprecatedKeys("jobmanager.heap.mb")` because 
`jobmanager.heap.size` and `jobmanager.heap.mb` has different meaning. The 
former can use different unit such **1g** but the latter can just measure with 
**MB**.

to @dawidwys and @GJL , now the `jobmanager.heap.mb` just used in config 
file, and can be calculated accurately, this is used for backwards 
compatibility, but in the project, it is useless, all the place can be replaced 
with `jobmanager.heap.size`, and the key in the code could not been exposed to 
the user?


---


[GitHub] flink issue #6307: [FLINK-9805][rest] Catch JsonProcessingException in RestC...

2018-07-12 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6307
  
+1


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202013852
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+  jobManagerMetricGroup.removeJob(jobID)
+}
+
+case scala.util.Failure(_) =>
+
+  }(context.dispatcher)
+
+  case None => None
+}
 
-jobManagerMetricGroup.removeJob(jobID)
 
--- End diff --

this line can also be removed


---


[GitHub] flink issue #6266: [FLINK-9682] Add setDescription to execution environment ...

2018-07-12 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6266
  
hi @tillrohrmann I have split the original issue into two issues, this PR 
for the first backend issue, the second issue will depend on this PR, please 
review~


---


[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails

2018-07-12 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5954
  
@tillrohrmann That's all right. I know you are very busy. Just a little 
question, I have reviewed PR(#6202). I saw it used `Exception`, and there is a 
suggestion from Stephan in May 7 : 

```
I would use Throwable in the signatures. It may always be that some Error 
is the cause (class not found, etc.)
```

So I replaced the `Exception ` to `Throwable` in this PR, do you think it 
can be consider? If not, I would close this PR. 


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201997596
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -101,12 +101,12 @@ Usage:
Optional
  -D Dynamic properties
  -d,--detached   Start detached
- -jm,--jobManagerMemory Memory for JobManager Container [in 
MB]
+ -jm,--jobManagerMemory Memory for JobManager Container [with 
unit, if not, use MB]
--- End diff --

change this soon


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201997464
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

in FLINK-6469, in order to config the jm's memory with unit I introduced a 
new key and deprecated `jobmanager.heap.mb`.

* in flink codebase(except shell script) I have removed all the place used 
`JOB_MANAGER_HEAP_MEMORY_MB` and `jobmanager.heap.mb`, so it will not be used.
* in shell (`config.sh`) the old key `jobmanager.heap.mb` also be supported 
if the new key `jobmanager.heap.size` can not be read, so it still be supported.

 


---


[GitHub] flink issue #6129: [FLINK-9503] Migrate integration tests for iterative aggr...

2018-07-12 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6129
  
@tillrohrmann this PR could be merged


---


[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...

2018-07-11 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6305#discussion_r201907959
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ---
@@ -871,19 +892,40 @@ public IntType(int value) {
}
}
 
-   protected int numElementsPerKey() {
-   return 300;
+   private int numElementsPerKey() {
+   switch (this.stateBackendEnum) {
+   case ROCKSDB_FULLY_ASYNC:
+   case ROCKSDB_INCREMENTAL:
+   case ROCKSDB_INCREMENTAL_ZK:
+   return 3000;
+   default:
+   return 300;
--- End diff --

seems missed a "tab" here


---


[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...

2018-07-11 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6305#discussion_r201908022
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ---
@@ -871,19 +892,40 @@ public IntType(int value) {
}
}
 
-   protected int numElementsPerKey() {
-   return 300;
+   private int numElementsPerKey() {
+   switch (this.stateBackendEnum) {
+   case ROCKSDB_FULLY_ASYNC:
+   case ROCKSDB_INCREMENTAL:
+   case ROCKSDB_INCREMENTAL_ZK:
+   return 3000;
--- End diff --

change this to a const looks better~


---


[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...

2018-07-11 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6305#discussion_r201908226
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
 ---
@@ -25,35 +25,46 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
-import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
+import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum;
+import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 
 /**
- * This test delegates to instances of {@link 
AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
+ * This test delegates to instances of {@link 
EventTimeWindowCheckpointingITCase} that have been reconfigured
  * to use local recovery.
  *
- * TODO: This class must be refactored to properly extend {@link 
AbstractEventTimeWindowCheckpointingITCase}.
+ * TODO: This class must be refactored to properly extend {@link 
EventTimeWindowCheckpointingITCase}.
--- End diff --

is the TODO still needed?


---


[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...

2018-07-11 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6305#discussion_r201908082
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ---
@@ -871,19 +892,40 @@ public IntType(int value) {
}
}
 
-   protected int numElementsPerKey() {
-   return 300;
+   private int numElementsPerKey() {
+   switch (this.stateBackendEnum) {
+   case ROCKSDB_FULLY_ASYNC:
+   case ROCKSDB_INCREMENTAL:
+   case ROCKSDB_INCREMENTAL_ZK:
+   return 3000;
+   default:
+   return 300;
+   }
}
 
-   protected int windowSize() {
-   return 100;
+   private int windowSize() {
+   switch (this.stateBackendEnum) {
+   case ROCKSDB_FULLY_ASYNC:
+   case ROCKSDB_INCREMENTAL:
+   case ROCKSDB_INCREMENTAL_ZK:
+   return 1000;
--- End diff --

change this to a const looks better to me


---


[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

2018-07-11 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6301
  
@sihuazhou is right and reviewed, +1 from my side


---


[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-11 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
@zentol we really caused this exception, in our inner Flink version, we 
customized flink-table and implemented stream and dim table join. I think the 
default constructor is needed by deserialization.
Whatever it takes, the author who wrote this code misunderstood the 
variable `nestedSerializers ` could be null(in current case), but it did not 
happens. The truth is : the elements in `nestedSerializers` could be null.
We add a judgement and fixed this NPE, now it works OK.


---


[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-11 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6297
  
@zentol can you review this?


---


[GitHub] flink issue #6298: [FLINK-9784] Fix inconsistent use of 'static' in AsyncIOE...

2018-07-11 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6298
  
+1, from my side


---


[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6301#discussion_r201558827
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -41,6 +43,8 @@
 public class JDBCOutputFormat extends RichOutputFormat {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+   static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 
1000;
+   static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
--- End diff --

OK, my wrong, I misunderstand that you make this value as Timer#schedule 
method's third parameter period


---


[GitHub] flink issue #6296: flink on yarn ,Duplicate upload file flink-dist*.jar

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6296
  
@linjun007 please rename the PR title


---


[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6301#discussion_r201552317
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -41,6 +43,8 @@
 public class JDBCOutputFormat extends RichOutputFormat {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+   static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 
1000;
+   static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
--- End diff --

I'd like to change the const value to larger than "0", based on JDK 1.8, 
the `Timer#schedule` method's third parameter `period` less or equal than "0" 
will throw `IllegalArgumentException` exception, see here : 
https://docs.oracle.com/javase/8/docs/api/java/util/Timer.html#schedule-java.util.TimerTask-long-long-


---


[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6301#discussion_r201551945
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -54,6 +58,10 @@
private Connection dbConn;
private PreparedStatement upload;
 
+   private long idleConnectionCheckInterval = 
DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL;
+   private int idleConnectionCheckTimeOut = 
DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT;
--- End diff --

change the variable to `idleConnectionCheckTimeout ` looks better to me


---


[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6301#discussion_r201551369
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder 
setParameterTypes(int... types) {
return this;
}
 
+   public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long 
idleConnectionCheckInterval) {
+   this.idleConnectionCheckInterval = idleConnectionCheckInterval;
+   return this;
+   }
+
+   public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int 
idleConnectionCheckTimeout) {
--- End diff --

please add java doc


---


[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6301#discussion_r201551339
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 ---
@@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder 
setParameterTypes(int... types) {
return this;
}
 
+   public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long 
idleConnectionCheckInterval) {
--- End diff --

please add java doc


---


[GitHub] flink issue #6297: [FLINK-9777] YARN: JM and TM Memory must be specified wit...

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6297
  
@GJL please review~


---


[GitHub] flink issue #6296: 解决flink-dist*.jar多次上�

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6296
  
@linjun007 thanks for your contribution, please rename the PR's title based 
on this description : 

```
Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where FLINK- should be replaced by the actual issue number. 
Skip component if you are unsure about which is the best component.
```




---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-10 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6297

[FLINK-9777] YARN: JM and TM Memory must be specified with Units

## What is the purpose of the change

*This pull request specify unit for JM and TM memory on YARN mode*


## Brief change log

  - *parse the jm and tm with default MB unit*
  - *change related document*

## Verifying this change

This change is already covered by existing tests*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9777

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6297


commit 0ba28e996e5dc01040b9dd4cc9d3d86f6cb9dacd
Author: yanghua 
Date:   2018-07-10T15:16:12Z

[FLINK-9777] YARN: JM and TM Memory must be specified with Units




---


[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6292#discussion_r201313023
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+   private Map intercepted;
+
+   /**
+* Returns the registered metric for the given name, or null if it was 
never registered.
+*
+* @param name metric name
+* @return registered metric for the given name, or null if it was 
never registered
+*/
+   public Metric get(String name) {
+   return intercepted.get(name);
--- End diff --

 if this method is invoked before `addMetric` , then the `intercepted` 
would not be initialized, seems it can trigger NPE


---


[GitHub] flink issue #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') wrongly...

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6284
  
find related test error, please recheck~


---


[GitHub] flink issue #6285: [FLINK-9768][release] Speed up binary release

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6285
  
+1


---


[GitHub] flink issue #6286: [FLINK-9754][release] Remove references to scala profiles

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6286
  
+1


---


[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
@pnowojski  can you review this?


---


[GitHub] flink issue #6290: [Flink-9691] [Kinesis Connector] Attempt to call getRecor...

2018-07-10 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6290
  
+1, from my side~


---


[GitHub] flink pull request #6291: [FLINK-9785][network] add remote address informati...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6291#discussion_r201303678
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 ---
@@ -167,7 +167,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
tex = new RemoteTransportException("Lost 
connection to task manager '" + remoteAddr + "'. " +
"This indicates that the remote task 
manager was lost.", remoteAddr, cause);
} else {
-   tex = new 
LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), 
cause);
+   final SocketAddress localAddr = 
ctx.channel().localAddress();
+   tex = new 
LocalTransportException(cause.getMessage() + " (connection to '" + remoteAddr + 
"')", localAddr, cause);
--- End diff --

using `String.format` here looks better to me


---


[GitHub] flink pull request #6291: [FLINK-9785][network] add remote address informati...

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6291#discussion_r201304267
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -164,7 +164,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
+ "that the 
remote task manager was lost.", remoteAddr, cause);
}
else {
-   tex = new 
LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), 
cause);
+   SocketAddress localAddr = 
ctx.channel().localAddress();
+   tex = new 
LocalTransportException(cause.getMessage() + " (connection to '" + remoteAddr + 
"')",
--- End diff --

the same


---


[GitHub] flink pull request #6277: [FLINK-9511] Implement TTL config

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6277#discussion_r201264096
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
 ---
@@ -93,4 +97,82 @@ public Time getTtl() {
public TtlTimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
+
+   @Override
+   public String toString() {
+   return "StateTtlConfiguration{" +
+   "ttlUpdateType=" + ttlUpdateType +
+   ", stateVisibility=" + stateVisibility +
+   ", timeCharacteristic=" + timeCharacteristic +
+   ", ttl=" + ttl +
+   '}';
+   }
+
+   public static Builder newBuilder(Time ttl) {
+   return new Builder(ttl);
+   }
+
+   /**
+* Builder for the {@link StateTtlConfiguration}.
+*/
+   public static class Builder {
+
+   private TtlUpdateType ttlUpdateType = OnCreateAndWrite;
+   private TtlStateVisibility stateVisibility = NeverReturnExpired;
+   private TtlTimeCharacteristic timeCharacteristic = 
ProcessingTime;
+   private Time ttl;
+
+   public Builder(Time ttl) {
--- End diff --

@azagrebin I removed the default value for TimeCharacteristic, it will 
depend on users' choice. Do you agree this mode?


---


[GitHub] flink pull request #6277: [FLINK-9511] Implement TTL config

2018-07-10 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6277#discussion_r201229057
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
 ---
@@ -93,4 +97,82 @@ public Time getTtl() {
public TtlTimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
+
+   @Override
+   public String toString() {
+   return "StateTtlConfiguration{" +
+   "ttlUpdateType=" + ttlUpdateType +
+   ", stateVisibility=" + stateVisibility +
+   ", timeCharacteristic=" + timeCharacteristic +
+   ", ttl=" + ttl +
+   '}';
+   }
+
+   public static Builder newBuilder(Time ttl) {
+   return new Builder(ttl);
+   }
+
+   /**
+* Builder for the {@link StateTtlConfiguration}.
+*/
+   public static class Builder {
+
+   private TtlUpdateType ttlUpdateType = OnCreateAndWrite;
+   private TtlStateVisibility stateVisibility = NeverReturnExpired;
+   private TtlTimeCharacteristic timeCharacteristic = 
ProcessingTime;
+   private Time ttl;
+
+   public Builder(Time ttl) {
--- End diff --

ok, hold on.


---


  1   2   3   4   5   >