[GitHub] [flink] JingsongLi commented on issue #10415: [FLINK-15049][table-planner-blink] Compile error when hash join with …

2019-12-04 Thread GitBox
JingsongLi commented on issue #10415: [FLINK-15049][table-planner-blink] 
Compile error when hash join with …
URL: https://github.com/apache/flink/pull/10415#issuecomment-561533551
 
 
   Thanks @docete for you fixing, LGTM after travis passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuttie commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
shuttie commented on a change in pull request #10358: [FLINK-14346] 
[serialization] faster implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#discussion_r353607833
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/types/StringValue.java
 ##
 @@ -759,56 +761,142 @@ public static String readString(DataInput in) throws 
IOException {
}
len |= curr << shift;
}
-   
+
// subtract one for the null length
len -= 1;
-   
-   final char[] data = new char[len];
 
-   for (int i = 0; i < len; i++) {
-   int c = in.readUnsignedByte();
-   if (c < HIGH_BIT) {
-   data[i] = (char) c;
-   } else {
+   /* as we have no idea about byte-length of the serialized 
string, we cannot fully
+* read it into memory buffer. But we can do it in an 
optimistic way:
+* 1. In a happy case when the string is an us-ascii one, then 
byte_len == char_len
+* 2. If we spot at least one character with code >= 127, then 
we reallocate the buffer
+* to accommodate for the next characters.
+*/
+
+   // happily assume that the string is an 7 bit us-ascii one
+   byte[] buf = new byte[len];
+   in.readFully(buf);
+
+   final char[] data = new char[len];
+   int charPosition = 0;
+   int bufSize = len;
+   int bytePosition = 0;
+
+   while (charPosition < len) {
+   // there is at least `char count - char position` bytes 
left in case if all the
+   // remaining characters are 7 bit.
+   int remainingBytesEstimation = len - charPosition;
 
 Review comment:
   A nice catch. This variable only used at the buffer refill operation, so 
it's reasonable not to compute it for every character.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 9.0

2019-12-04 Thread GitBox
StephanEwen commented on issue #10363: [FLINK-14104][build] Upgrade to 
flink-shaded 9.0
URL: https://github.com/apache/flink/pull/10363#issuecomment-561535237
 
 
   Do we need to adjust any NOTICE files due to the changed dependency versions 
we get, or is this automatically merged from flink-shaded?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14572) BlobsCleanupITCase failed on Travis

2019-12-04 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987660#comment-16987660
 ] 

Gary Yao commented on FLINK-14572:
--

Unfortunately I don't have any either.

> BlobsCleanupITCase failed on Travis
> ---
>
> Key: FLINK-14572
> URL: https://issues.apache.org/jira/browse/FLINK-14572
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: Yun Gao
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> {noformat}
> java.lang.AssertionError: 
> Expected: is 
>  but: was 
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220)
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133)
> {noformat}
> https://api.travis-ci.com/v3/job/250445874/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] AHeise commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
AHeise commented on a change in pull request #10358: [FLINK-14346] 
[serialization] faster implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#discussion_r353626201
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/types/StringSerializationTest.java
 ##
 @@ -56,6 +56,27 @@ public void testNonNullValues() {
fail("Exception in test: " + e.getMessage());
}
}
+
+   @Test
+   public void testUnicodeValues() {
+   try {
 
 Review comment:
   Ah then leave it and we will eventually clean up on our end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15016) Remove unused dependency

2019-12-04 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-15016:
-
Fix Version/s: 1.10.0

> Remove unused dependency
> 
>
> Key: FLINK-15016
> URL: https://issues.apache.org/jira/browse/FLINK-15016
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: César Soto Valero
>Assignee: César Soto Valero
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Dependency *commons-io:commons-io* is declared in module *flink-core*. 
> However, this dependency is not used and, therefore, should be removed to 
> make the pom clearer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10330: [FLINK-14189][runtime] Extend TaskExecutor to support dynamic slot allocation

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10330: [FLINK-14189][runtime] Extend 
TaskExecutor to support dynamic slot allocation
URL: https://github.com/apache/flink/pull/10330#issuecomment-558970102
 
 
   
   ## CI report:
   
   * 38f2c1c450122cae9aa99258c19c6d07e998 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138360699)
   * 14ecf374fe3f6587e6eee29a39bf15190fac269c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138375450)
   * 761359a2f2509a06483e422e5e592b66e2e5661a : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138401670)
   * 88630bcdfd9c05fd352a422a88e59f26fba4dc7c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138455828)
   * 726fc5733c8ac35bdb69becbae13911d8e91dd07 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138990518)
   * c54d7b94b9ca46e6b8dc2a072919ab0b1deed022 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor…

2019-12-04 Thread GitBox
flinkbot commented on issue #10418: [FLINK-15050][table-planner-blink] 
DataFormatConverters should suppor…
URL: https://github.com/apache/flink/pull/10418#issuecomment-561557852
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 6951e77dff9c9c4fb1945d79ef4253e5ea2c0f0f (Wed Dec 04 
09:40:28 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-15050).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] Add missing test harnesses for broadcast functions

2019-12-04 Thread GitBox
aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] 
Add missing test harnesses for broadcast functions
URL: https://github.com/apache/flink/pull/10286#discussion_r353637844
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/BroadcastOperatorTestHarness.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
+
+/**
+ * A test harness for testing a {@link CoBroadcastWithNonKeyedOperator}.
 
 Review comment:
   I would relax this constraint and maybe say `A test harness for testing a 
{@link TwoInputStreamOperator} in a broadcast context.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] Add missing test harnesses for broadcast functions

2019-12-04 Thread GitBox
aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] 
Add missing test harnesses for broadcast functions
URL: https://github.com/apache/flink/pull/10286#discussion_r353638159
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
+
+
+/**
+ * A test harness for testing a {@link CoBroadcastWithKeyedOperator}.
 
 Review comment:
   Same as for the other operator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor…

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10418: 
[FLINK-15050][table-planner-blink] DataFormatConverters should suppor…
URL: https://github.com/apache/flink/pull/10418#discussion_r353640867
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 ##
 @@ -281,6 +288,28 @@ public static DataFormatConverter 
getConverterForDataType(DataType originDataTyp
return ps;
}
 
+   private static int getDateTimePrecision(LogicalType logicalType) {
+   if (logicalType instanceof TimestampType) {
+   TimestampType dt = (TimestampType) logicalType;
+   return dt.getPrecision();
+   } else {
+   TypeInformation typeInfo = ((LegacyTypeInformationType) 
logicalType).getTypeInformation();
+   if (typeInfo instanceof LegacyTimestampTypeInfo) {
+   LegacyTimestampTypeInfo dt = 
(LegacyTimestampTypeInfo) typeInfo;
+   return dt.getPrecision();
+   } else if (typeInfo instanceof 
LegacyLocalDateTimeTypeInfo) {
+   LegacyLocalDateTimeTypeInfo dt = 
(LegacyLocalDateTimeTypeInfo) typeInfo;
+   return dt.getPrecision();
+   } else if (typeInfo instanceof SqlTimeTypeInfo) {
+   return 3;
 
 Review comment:
   Let's remove `SqlTimeTypeInfo` and `LocalTimeTypeInfo`, precision 3 is a 
temporary solution, and should not reach here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-15047) YarnDistributedCacheITCase is unstable

2019-12-04 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987684#comment-16987684
 ] 

Gary Yao edited comment on FLINK-15047 at 12/4/19 9:57 AM:
---

-Running this test locally failed 10 out of 10 times. I am promoting the 
priority to Blocker.- 

Edit: I forgot to recompile Flink so I am not sure if it fails 10/10 times.


was (Author: gjy):
Running this test locally failed 10 out of 10 times. I am promoting the 
priority to Blocker.

> YarnDistributedCacheITCase is unstable
> --
>
> Key: FLINK-15047
> URL: https://issues.apache.org/jira/browse/FLINK-15047
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> See also https://api.travis-ci.com/v3/job/262854881/log.txt
> cc [~ZhenqiuHuang]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for equality to NaN

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for 
equality to NaN
URL: https://github.com/apache/flink/pull/10333#issuecomment-559012637
 
 
   
   ## CI report:
   
   * c5bd9898fbbeb3d668051475409b52bfb1e2 : UNKNOWN
   * 44f202263cbb26473b06ea19f293bf0b6afce8e6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138375519)
   * e0ea169fab74ca07617a056e1c60861938ad6f0b : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139122432)
   * 38bef99d2f337b556c87141e0240ddd7075fd980 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on issue #10419: [FLINK-15026][SQL-CLIENT]Support create/drop/alter database in sql client

2019-12-04 Thread GitBox
zjuwangg commented on issue #10419: [FLINK-15026][SQL-CLIENT]Support 
create/drop/alter database in sql client
URL: https://github.com/apache/flink/pull/10419#issuecomment-561570095
 
 
   @danny0405 @xuefuz @bowenli86 @KurtYoung to have a review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg opened a new pull request #10419: [FLINK-15026][SQL-CLIENT]Support create/drop/alter database in sql client

2019-12-04 Thread GitBox
zjuwangg opened a new pull request #10419: [FLINK-15026][SQL-CLIENT]Support 
create/drop/alter database in sql client
URL: https://github.com/apache/flink/pull/10419
 
 
   ## What is the purpose of the change
   
   *Since we have support create/drop/alter database operation in 
TableEnvironment, it's natural and easy to hook up such operation in sql client 
too. This PR aims to bridge that.*
   
   
   ## Brief change log
   
 - 
[0433794](https://github.com/apache/flink/commit/043379493823c0ac38c05d355471079d6808d16a)
 Support create/drop/alter database in sql client
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added unit tests for SqlParserCommand in 
SqlCommandParserTest#testCommands*
 - *Added test that validates create database in  
LocalExecutorITCase#testCreateDatabase*
 - *Added test that validates drop database in 
LocalExecutorITCase#testDropDatabase*
 - *Added test that validates alter database in 
LocalExecutorITCase#alterDatabase*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize 
all mailbox actions
URL: https://github.com/apache/flink/pull/10345#issuecomment-559421343
 
 
   
   ## CI report:
   
   * 5206399001512006f4b3d7663e7b5be8ea02a4a2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138541983)
   * cb4089dec82717cdc6ad6e78b2dbf4d0b03e57d4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138580753)
   * 18e1d269e688e6f39fd02cf409316776b24e8601 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138606098)
   * 8924109bc101b39c6057d44aa14224cc12215b7c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138625034)
   * 1922068e5dd92138fb2cd37c225ae0e5c6a5284f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138628834)
   * b1bd81bf936adb02f353374ebe9fd9c861cd2fe4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138801818)
   * 7499db309b268acd96ec423d629bfa633c3f150d : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/138824944)
   * a6cb2444a7532c4a5eadcab5da395de3a90f9474 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138828305)
   * da403e52ec9b8727f3002c14b67453722e06bf2b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138854348)
   * 0401e51b6babb79ceff3898c23d8d9136d584861 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138861032)
   * 8f6083ad4e48706912edb85bce1d116b37bc6d67 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138864136)
   * f0797b08be8f53f68da3016f77092b38bf612ee5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138947296)
   * e3bde3bb95123b295ff2b60139b3f774b9deb62a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138967214)
   * 5ab5c44b979077edc8d5a1b8c295814ec6ebb6eb : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139122458)
   * e09e524aca2fd0976ab296de8d0d012f08f53294 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139168737)
   * 7b1a700a18d027f3d618233ce639bcb9ec08396e : UNKNOWN
   * 590f7e632a00a49fb5727d06537ee89a613049d7 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139218625)
   * 8e1406ed035a978468818c1923d3dbc0a21a49ca : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139227312)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster 
implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#issuecomment-559765475
 
 
   
   ## CI report:
   
   * b106e8b0327fb78ebac1894a8f8fa51718b3bbba : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138698185)
   * 5b428f7f59c8b2ec3c2751f642de5b343914580b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139147374)
   * 622362f4f1a1240aa8ea5470cdc9364cf136f75c : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
StephanEwen commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353679918
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = kvStateInformation;
 
this.writeOptions = new WriteOptions().setDisableWAL(true);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
 
 Review comment:
   I think the annotation is only really evaluated in IntelliJ by the 
inspections. It does not prevent anything during runtime, so this extra check 
does not hurt, in my opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10394: [FLINK-14663]Distinguish catalogColumnStats' unknown value and real values

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10394: [FLINK-14663]Distinguish 
catalogColumnStats' unknown  value and real values
URL: https://github.com/apache/flink/pull/10394#issuecomment-561095403
 
 
   
   ## CI report:
   
   * 68e77528300eadcbeaa62372d676b01e268c39a8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116845)
   * 060aaebb68bab8fcd7054c782a3cd3c2b2b89ec2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139259032)
   * 4293b717dcc110d76748c4d5fff2022ec0ff01f2 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-04 Thread GitBox
rkhachatryan commented on issue #10345: [FLINK-12484][runtime] synchronize all 
mailbox actions
URL: https://github.com/apache/flink/pull/10345#issuecomment-561605260
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize 
all mailbox actions
URL: https://github.com/apache/flink/pull/10345#issuecomment-559421343
 
 
   
   ## CI report:
   
   * 5206399001512006f4b3d7663e7b5be8ea02a4a2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138541983)
   * cb4089dec82717cdc6ad6e78b2dbf4d0b03e57d4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138580753)
   * 18e1d269e688e6f39fd02cf409316776b24e8601 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138606098)
   * 8924109bc101b39c6057d44aa14224cc12215b7c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138625034)
   * 1922068e5dd92138fb2cd37c225ae0e5c6a5284f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138628834)
   * b1bd81bf936adb02f353374ebe9fd9c861cd2fe4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138801818)
   * 7499db309b268acd96ec423d629bfa633c3f150d : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/138824944)
   * a6cb2444a7532c4a5eadcab5da395de3a90f9474 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138828305)
   * da403e52ec9b8727f3002c14b67453722e06bf2b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138854348)
   * 0401e51b6babb79ceff3898c23d8d9136d584861 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138861032)
   * 8f6083ad4e48706912edb85bce1d116b37bc6d67 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138864136)
   * f0797b08be8f53f68da3016f77092b38bf612ee5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138947296)
   * e3bde3bb95123b295ff2b60139b3f774b9deb62a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138967214)
   * 5ab5c44b979077edc8d5a1b8c295814ec6ebb6eb : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139122458)
   * e09e524aca2fd0976ab296de8d0d012f08f53294 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139168737)
   * 7b1a700a18d027f3d618233ce639bcb9ec08396e : UNKNOWN
   * 590f7e632a00a49fb5727d06537ee89a613049d7 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139218625)
   * 8e1406ed035a978468818c1923d3dbc0a21a49ca : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139227312)
   * 6cfe76bac6ba0318f73484ff8fcbe63d21e1c393 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster 
implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#issuecomment-559765475
 
 
   
   ## CI report:
   
   * b106e8b0327fb78ebac1894a8f8fa51718b3bbba : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138698185)
   * 5b428f7f59c8b2ec3c2751f642de5b343914580b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139147374)
   * 622362f4f1a1240aa8ea5470cdc9364cf136f75c : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139302203)
   * 4396fe5a2ca35cf55502e72ab70c4ac8f1eecc29 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10359: [FLINK-14813][metrics] Provide `isBackPressured` Task metric

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10359: [FLINK-14813][metrics] Provide 
`isBackPressured` Task metric
URL: https://github.com/apache/flink/pull/10359#issuecomment-559773427
 
 
   
   ## CI report:
   
   * b05689f09ee9e5eaa9ceacc223a9027b467143a6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138701358)
   * 7257acd25d0922ca098d64560d518c4810042085 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139263373)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-04 Thread GitBox
zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r353594121
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
+import org.apache.flink.runtime.io.compression.BlockCompressor;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Compressor for {@link Buffer}.
+ */
+public class BufferCompressor {
+
+   /** The backing block compressor for data compression. */
+   private final BlockCompressor blockCompressor;
+
+   /** The intermediate heap buffer for the compressed data. */
+   private final byte[] heapBuffer;
+
+   public BufferCompressor(int bufferSize, String factoryName) {
+   checkArgument(bufferSize > 0);
+   // the size of this intermediate heap buffer will be gotten 
from the
+   // plugin configuration in the future, and currently, double 
size of
+   // the input buffer is enough for lz4-java compression library.
+   this.heapBuffer = new byte[2 * bufferSize];
+   this.blockCompressor = 
BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor();
+   }
+
+   /**
+* Compresses the given {@link Buffer} using {@link BlockCompressor}. 
The compressed data will be stored in the
+* internal heap buffer of this {@link BufferCompressor} and returned 
to the caller. The caller must guarantee
+* that the returned {@link Buffer} is freed when calling the method 
next time.
+*
+* Notes that the compression will always start from offset 0 to the 
size of the input {@link Buffer}.
+*/
+   public Buffer compressToInternalBuffer(Buffer buffer) {
+   int compressedLen;
+   if ((compressedLen = compress(buffer)) == 0) {
+   return buffer;
+   }
+
+   try {
+   if (compressedLen >= buffer.getSize()) {
+   return buffer;
+   }
+
+   // warp the internal heap buffer as Buffer
+   MemorySegment memorySegment = 
MemorySegmentFactory.wrap(heapBuffer);
+   NetworkBuffer compressedBuffer = new 
NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE);
+   compressedBuffer.setSize(compressedLen);
+   compressedBuffer.setCompressed(true);
+
+   return compressedBuffer;
+   } catch (Throwable throwable) {
+   return buffer;
+   }
+   }
+
+   /**
+* The difference between this method and {@link 
#compressToInternalBuffer(Buffer)} is that this method will
+* copy the compressed data back to the input {@link Buffer} starting 
from offset 0.
+*
+* The caller must guarantee that the input {@link Buffer} is 
writable and there's enough space left.
+*/
+   public Buffer compressInPlace(Buffer buffer) {
 
 Review comment:
   The same comments I mentioned for above `compressToInternalBuffer`. Merge 
condition and remove `catch` clause.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-04 Thread GitBox
zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r353594121
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
+import org.apache.flink.runtime.io.compression.BlockCompressor;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Compressor for {@link Buffer}.
+ */
+public class BufferCompressor {
+
+   /** The backing block compressor for data compression. */
+   private final BlockCompressor blockCompressor;
+
+   /** The intermediate heap buffer for the compressed data. */
+   private final byte[] heapBuffer;
+
+   public BufferCompressor(int bufferSize, String factoryName) {
+   checkArgument(bufferSize > 0);
+   // the size of this intermediate heap buffer will be gotten 
from the
+   // plugin configuration in the future, and currently, double 
size of
+   // the input buffer is enough for lz4-java compression library.
+   this.heapBuffer = new byte[2 * bufferSize];
+   this.blockCompressor = 
BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor();
+   }
+
+   /**
+* Compresses the given {@link Buffer} using {@link BlockCompressor}. 
The compressed data will be stored in the
+* internal heap buffer of this {@link BufferCompressor} and returned 
to the caller. The caller must guarantee
+* that the returned {@link Buffer} is freed when calling the method 
next time.
+*
+* Notes that the compression will always start from offset 0 to the 
size of the input {@link Buffer}.
+*/
+   public Buffer compressToInternalBuffer(Buffer buffer) {
+   int compressedLen;
+   if ((compressedLen = compress(buffer)) == 0) {
+   return buffer;
+   }
+
+   try {
+   if (compressedLen >= buffer.getSize()) {
+   return buffer;
+   }
+
+   // warp the internal heap buffer as Buffer
+   MemorySegment memorySegment = 
MemorySegmentFactory.wrap(heapBuffer);
+   NetworkBuffer compressedBuffer = new 
NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE);
+   compressedBuffer.setSize(compressedLen);
+   compressedBuffer.setCompressed(true);
+
+   return compressedBuffer;
+   } catch (Throwable throwable) {
+   return buffer;
+   }
+   }
+
+   /**
+* The difference between this method and {@link 
#compressToInternalBuffer(Buffer)} is that this method will
+* copy the compressed data back to the input {@link Buffer} starting 
from offset 0.
+*
+* The caller must guarantee that the input {@link Buffer} is 
writable and there's enough space left.
+*/
+   public Buffer compressInPlace(Buffer buffer) {
 
 Review comment:
   The same comments I mentioned for above `compressToInternalBuffer`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor 
derive and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115
 
 
   
   ## CI report:
   
   * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135875925)
   * c4b4f4d5c88a1a5009325a6260cf2d91ed69ca96 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135885120)
   * 2d5269d2498d96550682d113d61382b7a9ac9721 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135902960)
   * 5ee8701f76b9e6f2dcb451eb988371bea3b0a38d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136486584)
   * 2c52d7157f5e1b25dfaa00fe50cf7b04e7d6a97e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136497720)
   * 2d734eeff7480adc2ea1f3695f31ba5a169f3a05 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136501966)
   * 4edae43ff7eaf0357f5e8604b02b88749c8d153f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136522752)
   * 927a11838172fe792636923e9378677f92a48b73 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138028348)
   * a73de7a3fc63fe2d2a9bd12e03efb45bfcbf9ca8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138455883)
   * d06a271e355a36fd316f35d98e2905df8829273a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138768094)
   * 34c5662c256c22dbb3b770a3203090b19615d338 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138984403)
   * fb7c23c89663e70842341264ac35d3160ee11e6f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139087272)
   * b083c68650acd6b097dc7049276df080196db26f : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139278832)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14572) BlobsCleanupITCase failed on Travis

2019-12-04 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987647#comment-16987647
 ] 

Zhu Zhu commented on FLINK-14572:
-

Unfortunately, the detailed logs downloaded are removed already from my local 
machine. :(

> BlobsCleanupITCase failed on Travis
> ---
>
> Key: FLINK-14572
> URL: https://issues.apache.org/jira/browse/FLINK-14572
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: Yun Gao
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> {noformat}
> java.lang.AssertionError: 
> Expected: is 
>  but: was 
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220)
>   at 
> org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133)
> {noformat}
> https://api.travis-ci.com/v3/job/250445874/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to execution environments

2019-12-04 Thread GitBox
TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to 
execution environments
URL: https://github.com/apache/flink/pull/10408#issuecomment-561541292
 
 
   @aljoscha it would make sense. One concern is about the close actions. For 
some implementations of JobClient, specifically, anon subclasses of 
ClusterClientJobClientAdapter, we close some internal resource such as the 
MiniCluster or remove a shutdown hook. It should not be the case for that 
duplicated one. The duplicated one should only close , for our specific 
implementation, rest client & ha services & execution service held by 
ClusterClient.
   
   I have an idea for doing so, let me push a follow-up commit and give you an 
impression.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode

2019-12-04 Thread GitBox
GJL commented on a change in pull request #10406: [FLINK-15045][runtime] Only 
log RestartStrategy in legacy scheduling mode
URL: https://github.com/apache/flink/pull/10406#discussion_r353620416
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
 ##
 @@ -56,20 +56,20 @@ private RestartBackoffTimeStrategyFactoryLoader() {
 *
 * @param jobRestartStrategyConfiguration restart configuration given 
within the job graph
 * @param clusterConfiguration cluster(server-side) configuration
-* @param isCheckpointingEnabled if checkpointing is enabled for the job
+* @param checkpointingEnabled if checkpointing is enabled for the job
 * @return new version restart strategy factory
 */
public static RestartBackoffTimeStrategy.Factory 
createRestartBackoffTimeStrategyFactory(
final RestartStrategies.RestartStrategyConfiguration 
jobRestartStrategyConfiguration,
final Configuration clusterConfiguration,
-   final boolean isCheckpointingEnabled) {
+   final boolean checkpointingEnabled) {
 
 Review comment:
   For consistency I would prefer having the same naming conventions for 
boolean setters and static methods. However, since there is no rule in the 
Flink coding style guidelines, and we are already using both styles [1][2], I'd 
be also fine to drop this commit.
   
   [1] 
https://github.com/apache/flink/blob/86c232437f74576fdee3baad11c58f77714269fc/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L112
   
   [2] 
https://github.com/apache/flink/blob/86c232437f74576fdee3baad11c58f77714269fc/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java#L206


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #10402: [FLINK-15034] Bump FRocksDB version for memory control

2019-12-04 Thread GitBox
Myasuka commented on issue #10402: [FLINK-15034] Bump FRocksDB version for 
memory control
URL: https://github.com/apache/flink/pull/10402#issuecomment-561550622
 
 
   @StephanEwen thanks for your examination, already updated 
`flink-dist/src/main/resources/META-INF/NOTICE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys opened a new pull request #10417: [FLINK-14910][datastream] Checking for auto generated uids only for PhysicalStreamTransformations

2019-12-04 Thread GitBox
dawidwys opened a new pull request #10417: [FLINK-14910][datastream] Checking 
for auto generated uids only for PhysicalStreamTransformations
URL: https://github.com/apache/flink/pull/10417
 
 
   ## What is the purpose of the change
   
   This PR enables the check for auto generated uids only for 
`PhysicalStreamTransformations`, as those are the only `StreamTransformations` 
that will produce a `StreamOperator` that can have a state.
   
   This solves the problem that it is not possible to assign the uid for some 
of the non-physical transformations (e.g `keyBy`, `split`).
   
   ## Verifying this change
   
   Added:
   
`org.apache.flink.streaming.graph.StreamingJobGraphGeneratorNodeHashTest#testDisablingAutoUidsWorksWithKeyBy`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12283) In Table API, allow non-static inner class as UDF

2019-12-04 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-12283:
-
Summary: In Table API, allow non-static inner class as UDF  (was: Allow 
non-static inner class as UDF)

> In Table API, allow non-static inner class as UDF
> -
>
> Key: FLINK-12283
> URL: https://issues.apache.org/jira/browse/FLINK-12283
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> See details here 
> [https://lists.apache.org/thread.html/9ecec89ba1225dbd6b3ea2466a910ad9685a42a4672b449f6ee13565@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 9.0

2019-12-04 Thread GitBox
zentol commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 
9.0
URL: https://github.com/apache/flink/pull/10363#issuecomment-561550679
 
 
   This happens automatically. We don't list flink dependencies in our notice 
files (that's why flink-dist NOTICE remains unchanged) and the licensing for 
the binary is generated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15047) YarnDistributedCacheITCase is unstable

2019-12-04 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-15047:
-
Labels: test-stability  (was: )

> YarnDistributedCacheITCase is unstable
> --
>
> Key: FLINK-15047
> URL: https://issues.apache.org/jira/browse/FLINK-15047
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Priority: Major
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> See also https://api.travis-ci.com/v3/job/262854881/log.txt
> cc [~ZhenqiuHuang]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14910) disableAutoGeneratedUIDs fails on keyBy

2019-12-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14910:
---
Labels: pull-request-available  (was: )

> disableAutoGeneratedUIDs fails on keyBy
> ---
>
> Key: FLINK-14910
> URL: https://issues.apache.org/jira/browse/FLINK-14910
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: William Cheng
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.2
>
>
> There doesn't seem to be a way to add a UID to the Partition operator created 
> by KeyBy, causing `disableAutoGeneratedUIDs` to fail.
>  
> Here's a simple test case that will reproduce the issue:
> {noformat}
>  @Test
> public void testFailedUID() throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableAutoGeneratedUIDs();
> DataStream data = env.fromCollection(Arrays.asList("1", "2", 
> "3")).uid("source-uid");
> data.keyBy(i -> i)
> .map(i -> i).uid("map-uid");
> env.execute();
> }{noformat}
> {noformat}
> testFailedUID(twitch.creatoranalytics.sessions.StreamingJobTest)  Time 
> elapsed: 0.008 sec  <<< ERROR!
> java.lang.IllegalStateException: Auto generated UIDs have been disabled but 
> no UID or hash has been assigned to operator Partition
>  {noformat}
>  
> This passes if the keyBy is removed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode

2019-12-04 Thread GitBox
zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] 
Only log RestartStrategy in legacy scheduling mode
URL: https://github.com/apache/flink/pull/10406#discussion_r353638280
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -49,6 +50,8 @@
  */
 public class DefaultSchedulerFactory implements SchedulerNGFactory {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSchedulerFactory.class);
+
@Override
public SchedulerNG createInstance(
final Logger log,
 
 Review comment:
   Should we use this logger?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10399: [FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType in blink planner

2019-12-04 Thread GitBox
docete commented on a change in pull request #10399: 
[FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType 
in blink planner
URL: https://github.com/apache/flink/pull/10399#discussion_r353638560
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##
 @@ -386,10 +386,20 @@ object GenerateUtils {
 generateNonNullLiteral(literalType, fieldTerm, literalType)
 
   case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-val millis = 
unixTimestampToLocalDateTime(literalValue.asInstanceOf[Long])
+val fieldTerm = newName("timestampWithLocalZone")
+val millis = unixTimestampToLocalDateTime(
+  literalValue.asInstanceOf[TimestampString].getMillisSinceEpoch)
 .atZone(ctx.tableConfig.getLocalTimeZone)
 .toInstant.toEpochMilli
-generateNonNullLiteral(literalType, millis + "L", literalValue)
+val nanoOfMillis = SqlDateTimeUtils.getNanoOfMillisSinceEpoch(
 
 Review comment:
   the previous `Instant` only contains millisecond part 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10399: [FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType in blink planner

2019-12-04 Thread GitBox
docete commented on a change in pull request #10399: 
[FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType 
in blink planner
URL: https://github.com/apache/flink/pull/10399#discussion_r353642653
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
 ##
 @@ -66,7 +67,18 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) 
extends CallGenerato
   boxedTypeTermForType(returnType)
 }
 val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
-val evalResult = 
s"$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")})"
+val evalResult =
+  if (returnType.getTypeRoot == 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+  && (resultClass == classOf[Long] || resultClass == classOf[JLong])) {
+// Convert Long to SqlTimestamp if the UDX's returnType is
 
 Review comment:
   will add a Long <-> Instant converter to support this. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15046) Add guideline on how to report security issues

2019-12-04 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-15046:
-
Description: As discussed in the 
[ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951]
 , there should be a guideline on how to report security issues in Flink 
website.

> Add guideline on how to report security issues
> --
>
> Key: FLINK-15046
> URL: https://issues.apache.org/jira/browse/FLINK-15046
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in the 
> [ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951]
>  , there should be a guideline on how to report security issues in Flink 
> website.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15026) Support database DDLs in SQL CLI

2019-12-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15026:
---
Labels: pull-request-available  (was: )

> Support database DDLs in SQL CLI
> 
>
> Key: FLINK-15026
> URL: https://issues.apache.org/jira/browse/FLINK-15026
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Affects Versions: 1.9.1
>Reporter: Danny Chen
>Assignee: Terry Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> Support DDL as following:
> {code:sql}
> CREATE DATABASE [ IF NOT EXISTS ] [ catalogName.] dataBaseName [ COMMENT 
> database_comment ] [WITH ( name=value [, name=value]*)]
> DROP DATABASE [ IF EXISTS ] [ catalogName.] dataBaseName [ (RESTRICT|CASCADE)]
> ALTER DATABASE [ catalogName.] dataBaseName SET ( name=value [, name=value]*)
> USE [ catalogName.] dataBaseName
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-15046) Add guideline on how to report security issues

2019-12-04 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-15046:


Assignee: Dian Fu

> Add guideline on how to report security issues
> --
>
> Key: FLINK-15046
> URL: https://issues.apache.org/jira/browse/FLINK-15046
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in the 
> [ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951]
>  , there should be a guideline on how to report security issues in Flink 
> website.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15046) Add guideline on how to report security issues

2019-12-04 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-15046:
-
Environment: (was: As discussed in the 
[ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951]
 , there should be a guideline on how to report security issues in Flink 
website.)

> Add guideline on how to report security issues
> --
>
> Key: FLINK-15046
> URL: https://issues.apache.org/jira/browse/FLINK-15046
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster 
implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#issuecomment-561580556
 
 
   This is nice work, thanks a lot :-)
   
   Could you add a test case that ensures the encoding is still the same? Maybe 
copy the old String read/write logic and compare it with the new one for some 
random Strings?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuttie commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
shuttie commented on issue #10358: [FLINK-14346] [serialization] faster 
implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#issuecomment-561581049
 
 
   @AHeise thanks for all the ideas, I've updated the PR with all the proposals 
applied. 
   
   As for `writeString` fallback code, I've found a better way of dealing with 
short strings, not requiring a separate code path. If you stare long enough in 
the jmh perfasm listing for short strings, you may notice that most of the time 
(compared with the original implementation) is spent within initial buffer size 
computation. In the original unbuffered code there is no reason to compute it, 
as there is no buffer. But in this PR we need to scan a string twice: to 
compute the buffer size, and then to write characters to the buffer.
   
   Main idea of this PR is to leverage CPU-level parallelism, helping it to 
process multiple characters at once. But the problem with short strings is that 
there is nothing to parallelize, so double-scanning overhead starts to kill the 
performance.
   
   The proposed fix is to over-allocate the buffer for short strings, skipping 
the exact buffer size computation. I've found a tipping point for this approach 
laying somewhere between 6-8 characters:
   * for strings < 6 chars it's faster to overallocate,
   * for strings of 6-8 chars it's the same as exact computation,
   * for strings > 8 chars it can be slower, but insignificantly. But in theory 
it may produce some GC pressure.
   
   The current round of benchmarks:
   ```
   [info] Benchmark(length)  (stringType)  
Mode  Cnt   Score   Error  Units
   [info] StringDeserializerBenchmark.deserializeDefault  1 
ascii  avgt   50   45.618 ± 0.339  ns/op
   [info] StringDeserializerBenchmark.deserializeDefault  2 
ascii  avgt   50   61.348 ± 0.579  ns/op
   [info] StringDeserializerBenchmark.deserializeDefault  4 
ascii  avgt   50   88.067 ± 1.058  ns/op
   [info] StringDeserializerBenchmark.deserializeDefault  8 
ascii  avgt   50  142.902 ± 1.121  ns/op
   [info] StringDeserializerBenchmark.deserializeDefault 16 
ascii  avgt   50  249.181 ± 1.920  ns/op
   [info] StringDeserializerBenchmark.deserializeDefault 32 
ascii  avgt   50  466.382 ± 1.502  ns/op
   [info] StringDeserializerBenchmark.deserializeImproved 1 
ascii  avgt   50   49.916 ± 0.132  ns/op
   [info] StringDeserializerBenchmark.deserializeImproved 2 
ascii  avgt   50   50.278 ± 0.064  ns/op
   [info] StringDeserializerBenchmark.deserializeImproved 4 
ascii  avgt   50   50.365 ± 0.129  ns/op
   [info] StringDeserializerBenchmark.deserializeImproved 8 
ascii  avgt   50   52.463 ± 0.301  ns/op
   [info] StringDeserializerBenchmark.deserializeImproved16 
ascii  avgt   50   55.711 ± 0.597  ns/op
   [info] StringDeserializerBenchmark.deserializeImproved32 
ascii  avgt   50   65.342 ± 0.555  ns/op
   [info] StringSerializerBenchmark.serializeDefault  1 
ascii  avgt   50   31.076 ± 0.192  ns/op
   [info] StringSerializerBenchmark.serializeDefault  2 
ascii  avgt   50   31.770 ± 1.811  ns/op
   [info] StringSerializerBenchmark.serializeDefault  4 
ascii  avgt   50   39.251 ± 0.189  ns/op
   [info] StringSerializerBenchmark.serializeDefault  8 
ascii  avgt   50   57.736 ± 0.253  ns/op
   [info] StringSerializerBenchmark.serializeDefault 16 
ascii  avgt   50   94.964 ± 0.514  ns/op
   [info] StringSerializerBenchmark.serializeDefault 32 
ascii  avgt   50  168.754 ± 1.416  ns/op
   [info] StringSerializerBenchmark.serializeImproved 1 
ascii  avgt   50   30.145 ± 0.156  ns/op
   [info] StringSerializerBenchmark.serializeImproved 2 
ascii  avgt   50   30.873 ± 0.274  ns/op
   [info] StringSerializerBenchmark.serializeImproved 4 
ascii  avgt   50   31.993 ± 0.276  ns/op
   [info] StringSerializerBenchmark.serializeImproved 8 
ascii  avgt   50   46.220 ± 0.211  ns/op
   [info] StringSerializerBenchmark.serializeImproved16 
ascii  avgt   50   50.856 ± 0.826  ns/op
   [info] StringSerializerBenchmark.serializeImproved32 
ascii  avgt   50   63.221 ± 1.130  ns/op
   ```
   So for large strings the new implementation is much faster, and for short 
it's not regressing (and even slightly faster).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With 

[GitHub] [flink] flinkbot edited a comment on issue #10417: [FLINK-14910][datastream] Checking for auto generated uids only for PhysicalStreamTransformations

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10417: [FLINK-14910][datastream] Checking 
for auto generated uids only for PhysicalStreamTransformations
URL: https://github.com/apache/flink/pull/10417#issuecomment-561567788
 
 
   
   ## CI report:
   
   * 64b24f882e32caebd6489f890ca477305acf31cc : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139296355)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor…

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10418: [FLINK-15050][table-planner-blink] 
DataFormatConverters should suppor…
URL: https://github.com/apache/flink/pull/10418#issuecomment-561567846
 
 
   
   ## CI report:
   
   * 6951e77dff9c9c4fb1945d79ef4253e5ea2c0f0f : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139296371)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13662) FlinkKinesisProducerTest.testBackpressure failed on Travis

2019-12-04 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987742#comment-16987742
 ] 

Till Rohrmann commented on FLINK-13662:
---

Do you wanna give it a try and see whether it fixes the problem [~fmthoma]?

> FlinkKinesisProducerTest.testBackpressure failed on Travis
> --
>
> Key: FLINK-13662
> URL: https://issues.apache.org/jira/browse/FLINK-13662
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{FlinkKinesisProducerTest.testBackpressure}} failed on Travis with
> {code}
> 14:45:50.489 [ERROR] Failures: 
> 14:45:50.489 [ERROR]   FlinkKinesisProducerTest.testBackpressure:298 Flush 
> triggered before reaching queue limit
> {code}
> https://api.travis-ci.org/v3/job/569262823/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol commented on a change in pull request #10384: [FLINK-13373][docs] Remove Tutorials and Examples sections from Getting Started

2019-12-04 Thread GitBox
zentol commented on a change in pull request #10384: [FLINK-13373][docs] Remove 
Tutorials and Examples sections from Getting Started
URL: https://github.com/apache/flink/pull/10384#discussion_r353675794
 
 

 ##
 File path: docs/ops/deployment/local.md
 ##
 @@ -0,0 +1,178 @@
+---
+title: "Local Cluster"
+nav-title: 'Local Cluster'
+nav-parent_id: deployment
+nav-pos: 1
+---
+
+
+Get a local Flink cluster up and running in a few simple steps.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Setup: Download and Start Flink
+
+Flink runs on __Linux, Mac OS X, and Windows__.
+To be able to run Flink, the only requirement is to have a working __Java 
8.x__ installation.
+
+You can check the correct installation of Java by issuing the following 
command:
+
+{% highlight bash %}
+java -version
+{% endhighlight %}
+
+If you have Java 8, the output will look something like this:
+
+{% highlight bash %}
+java version "1.8.0_111"
+Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
+Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
+{% endhighlight %}
+
+{% if site.is_stable %}
+
+
+1. Download a binary from the [downloads 
page](https://flink.apache.org/downloads.html). You can pick
+   any Scala variant you like. For certain features you may also have to 
download one of the pre-bundled Hadoop jars
+   and place them into the `/lib` directory.
+2. Go to the download directory.
+3. Unpack the downloaded archive.
+
+{% highlight bash %}
+$ cd ~/Downloads# Go to download directory
+$ tar xzf flink-*.tgz   # Unpack the downloaded archive
+$ cd flink-{{site.version}}
+{% endhighlight %}
+
+
+
+For MacOS X users, Flink can be installed through [Homebrew](https://brew.sh/).
+
+{% highlight bash %}
+$ brew install apache-flink
+...
+$ flink --version
+Version: 1.2.0, Commit ID: 1c659cf
+{% endhighlight %}
+
+
+
+
+{% else %}
+### Download and Compile
+Clone the source code from one of our 
[repositories](https://flink.apache.org/community.html#source-code), e.g.:
+
+{% highlight bash %}
+$ git clone https://github.com/apache/flink.git
+$ cd flink
+$ mvn clean package -DskipTests # this will take up to 10 minutes
 
 Review comment:
   yeah it's more like 25 I think. Depending on the environment the build of 
the WebUI may also slow things down significantly; or fail outright. May be 
useful to document `-Pskip-webui-build` for these cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-15047) YarnDistributedCacheITCase is unstable

2019-12-04 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-15047:
-

Assignee: Andrey Zagrebin

> YarnDistributedCacheITCase is unstable
> --
>
> Key: FLINK-15047
> URL: https://issues.apache.org/jira/browse/FLINK-15047
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> See also https://api.travis-ci.com/v3/job/262854881/log.txt
> cc [~ZhenqiuHuang]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10406: [FLINK-15045][runtime] Only log 
RestartStrategy in legacy scheduling mode
URL: https://github.com/apache/flink/pull/10406#issuecomment-561380440
 
 
   
   ## CI report:
   
   * 0effd7f6b294b7f2da91c7681aa6e8ae51390efc : UNKNOWN
   * 4fbefcc29212ef9c62258aa73ebd7b9f2a97 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139227336)
   * f1a71d26b6d4f370a58f48e246f59077ec83221c : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139290722)
   * 3be557618cc3c803ce23659fd38ae6644b9a3b7d : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139302249)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on issue #10359: [FLINK-14813][metrics] Provide `isBackPressured` Task metric

2019-12-04 Thread GitBox
jinglining commented on issue #10359: [FLINK-14813][metrics] Provide 
`isBackPressured` Task metric
URL: https://github.com/apache/flink/pull/10359#issuecomment-561604330
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #10325: [FLINK-14512][table] Introduce listPartitionsByFilter to Catalog

2019-12-04 Thread GitBox
JingsongLi commented on issue #10325: [FLINK-14512][table] Introduce 
listPartitionsByFilter to Catalog
URL: https://github.com/apache/flink/pull/10325#issuecomment-561618185
 
 
   > Thanks @JingsongLi for the update. It seems the failed test is related?
   
   Yes, we should modify python too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15052) sql client doesn't clear previous job graph

2019-12-04 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-15052:
---
Fix Version/s: 1.10.0

> sql client doesn't clear previous job graph 
> 
>
> Key: FLINK-15052
> URL: https://issues.apache.org/jira/browse/FLINK-15052
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Kurt Young
>Assignee: Danny Chen
>Priority: Blocker
> Fix For: 1.10.0
>
>
> when executing multiple commands from sql client, the later job graph will 
> include all job graphs which already executed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to execution environments

2019-12-04 Thread GitBox
TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to 
execution environments
URL: https://github.com/apache/flink/pull/10408#issuecomment-561525250
 
 
   > Thanks @TisonKun I am afraid I could not test it as in Zeppelin I use 
`ScalaShellRemoteEnvironment` which is not affected by this PR.
   
   Got it.
   
   @kl0u what is the progress of using new `Execution` in 
`ScalaShellRemoteEnvironment`? I don't see a dedicated JIRA ticket so far. For 
satisfying Zeppelin's requirement we might need write in-place twisting code if 
we cannot get `ScalaShellRemoteEnvironment` refactor in time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-04 Thread GitBox
zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r353595553
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
+import org.apache.flink.runtime.io.compression.BlockCompressor;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Compressor for {@link Buffer}.
+ */
+public class BufferCompressor {
+
+   /** The backing block compressor for data compression. */
+   private final BlockCompressor blockCompressor;
+
+   /** The intermediate heap buffer for the compressed data. */
+   private final byte[] heapBuffer;
+
+   public BufferCompressor(int bufferSize, String factoryName) {
+   checkArgument(bufferSize > 0);
+   // the size of this intermediate heap buffer will be gotten 
from the
+   // plugin configuration in the future, and currently, double 
size of
+   // the input buffer is enough for lz4-java compression library.
+   this.heapBuffer = new byte[2 * bufferSize];
+   this.blockCompressor = 
BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor();
+   }
+
+   /**
+* Compresses the given {@link Buffer} using {@link BlockCompressor}. 
The compressed data will be stored in the
+* internal heap buffer of this {@link BufferCompressor} and returned 
to the caller. The caller must guarantee
+* that the returned {@link Buffer} is freed when calling the method 
next time.
+*
+* Notes that the compression will always start from offset 0 to the 
size of the input {@link Buffer}.
+*/
+   public Buffer compressToInternalBuffer(Buffer buffer) {
+   int compressedLen;
+   if ((compressedLen = compress(buffer)) == 0) {
+   return buffer;
+   }
+
+   try {
+   if (compressedLen >= buffer.getSize()) {
+   return buffer;
+   }
+
+   // warp the internal heap buffer as Buffer
+   MemorySegment memorySegment = 
MemorySegmentFactory.wrap(heapBuffer);
+   NetworkBuffer compressedBuffer = new 
NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE);
+   compressedBuffer.setSize(compressedLen);
+   compressedBuffer.setCompressed(true);
+
+   return compressedBuffer;
+   } catch (Throwable throwable) {
+   return buffer;
+   }
+   }
+
+   /**
+* The difference between this method and {@link 
#compressToInternalBuffer(Buffer)} is that this method will
+* copy the compressed data back to the input {@link Buffer} starting 
from offset 0.
+*
+* The caller must guarantee that the input {@link Buffer} is 
writable and there's enough space left.
+*/
+   public Buffer compressInPlace(Buffer buffer) {
+   int compressedLen;
+   if ((compressedLen = compress(buffer)) == 0) {
+   return buffer;
+   }
+
+   try {
+   if (compressedLen >= buffer.getSize()) {
+   return buffer;
+   }
+
+   // copy the compressed data back
+   MemorySegment segment = buffer.getMemorySegment();
+   segment.put(buffer.getMemorySegmentOffset(), 
heapBuffer, 0, compressedLen);
+   Buffer compressedBuffer = buffer.readOnlySlice(0, 
compressedLen);
+   

[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353019073
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -79,6 +80,101 @@ def __repr__(self):
 return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
+class ArrayCoderImpl(StreamCoderImpl):
+
+def __init__(self, elem_coder):
+self._elem_coder = elem_coder
+
+def encode_to_stream(self, value, out_stream, nested):
+out_stream.write_bigendian_int32(len(value))
+for elem in value:
+if elem is None:
+out_stream.write_byte(False)
+else:
+out_stream.write_byte(True)
+self._elem_coder.encode_to_stream(elem, out_stream, nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+elements = [self._elem_coder.decode_from_stream(in_stream, nested)
+if not not in_stream.read_byte() else None for _ in 
range(size)]
+return elements
+
+def __repr__(self):
+return 'ArrayCoderImpl[%s]' % str(self._elem_coder)
+
+
+class MapCoderImpl(StreamCoderImpl):
+
+def __init__(self, key_coder, value_coder):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+def encode_to_stream(self, map_value, out_stream, nested):
+out_stream.write_bigendian_int32(len(map_value))
+for key in map_value:
+self._key_coder.encode_to_stream(key, out_stream, nested)
+value = map_value[key]
+if value is None:
+out_stream.write_byte(True)
+else:
+out_stream.write_byte(False)
+self._value_coder.encode_to_stream(map_value[key], out_stream, 
nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+map_value = {}
+for _ in range(size):
+key = self._key_coder.decode_from_stream(in_stream, nested)
+is_null = not not in_stream.read_byte()
+if is_null:
 
 Review comment:
   use in_stream.read_byte() directly or bool(in_stream.read_byte())?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353016856
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -79,6 +80,101 @@ def __repr__(self):
 return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
+class ArrayCoderImpl(StreamCoderImpl):
+
+def __init__(self, elem_coder):
+self._elem_coder = elem_coder
+
+def encode_to_stream(self, value, out_stream, nested):
+out_stream.write_bigendian_int32(len(value))
+for elem in value:
+if elem is None:
+out_stream.write_byte(False)
+else:
+out_stream.write_byte(True)
+self._elem_coder.encode_to_stream(elem, out_stream, nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+elements = [self._elem_coder.decode_from_stream(in_stream, nested)
+if not not in_stream.read_byte() else None for _ in 
range(size)]
 
 Review comment:
   remove "not not"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353047775
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -140,6 +236,23 @@ def decode_from_stream(self, in_stream, nested):
 return in_stream.read_bigendian_double()
 
 
+class DecimalCoderImpl(StreamCoderImpl):
+
+def __init__(self, precision, scale):
+decimal.getcontext().prec = precision
 
 Review comment:
   Maybe we should hold a individual context object here and replace current 
context at the beginning of encode/decode and restore users' context at the end 
of encode/decode?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353592042
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/BinaryMapSerializer.java
 ##
 @@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * A {@link TypeSerializer} for {@link BinaryMap}. It should be noted that the 
header will not be encoded.
+ * Currently Python doesn't support BinaryMap natively, so we can't use 
BaseArraySerializer in blink directly.
+ */
+@Internal
+public class BinaryMapSerializer extends BaseMapSerializer {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353590279
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/BinaryArraySerializer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * A {@link TypeSerializer} for {@link BinaryArray}. It should be noted that 
the header will not be encoded.
+ * Currently Python doesn't support BinaryArray natively, so we can't use 
BaseArraySerializer in blink directly.
+ */
+@Internal
+public class BinaryArraySerializer extends BaseArraySerializer {
 
 Review comment:
   The name "BinaryArraySerializer" is not accurate, maybe 
"PythonBaseArraySerializer" is better?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353093687
 
 

 ##
 File path: flink-python/pyflink/fn_execution/tests/coders_test_common.py
 ##
 @@ -73,13 +74,34 @@ def test_binary_coder(self):
 
 def test_char_coder(self):
 coder = CharCoder()
-self.check_coder(coder, 'flink')
+self.check_coder(coder, 'flink', '')
 
 def test_date_coder(self):
 import datetime
 coder = DateCoder()
 self.check_coder(coder, datetime.date(2019, 9, 10))
 
+def test_array_coder(self):
+element_coder = BigIntCoder()
+coder = ArrayCoder(element_coder)
+self.check_coder(coder, [1, 2, 3, None])
+
+def test_map_coder(self):
+key_coder = CharCoder()
+value_coder = BigIntCoder()
+coder = MapCoder(key_coder, value_coder)
+self.check_coder(coder, {'flink': 1, 'pyflink': 2, 'coder': None})
+
+def test_multiset_coder(self):
+element_coder = CharCoder()
+coder = MultisetCoder(element_coder)
+self.check_coder(coder, ['flink', 'flink', 'pyflink'])
+
+def test_decimal_coder(self):
+from decimal import Decimal
+coder = DecimalCoder()
 
 Review comment:
   How about test with different precision?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353088625
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coders.py
 ##
 @@ -68,6 +70,98 @@ def __hash__(self):
 return hash(self._field_coders)
 
 
+class CollectionCoder(FastCoder):
+"""
+Base coder for collection.
+"""
+def __init__(self, elem_coder):
+self._elem_coder = elem_coder
+
+def _create_impl(self):
+return self._impl_coder()(self._elem_coder.get_impl())
+
+def _impl_coder(self):
+raise NotImplementedError
+
+def is_deterministic(self):
+return self._elem_coder.is_deterministic()
+
+def to_type_hint(self):
+return []
+
+def __eq__(self, other):
+return (self.__class__ == other.__class__
+and self._elem_coder == other._elem_coder)
+
+def __repr__(self):
+return '%s[%s]' % (self.__class__.__name__, str(self._elem_coder))
+
+def __ne__(self, other):
+return not self == other
+
+def __hash__(self):
+return hash(self._elem_coder)
+
+
+class ArrayCoder(CollectionCoder):
+"""
+Coder for Array.
+"""
+
+def __init__(self, elem_coder):
+self._elem_coder = elem_coder
+super(ArrayCoder, self).__init__(elem_coder)
+
+def _impl_coder(self):
 
 Review comment:
   How about override _create_impl directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353585748
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/BinaryArraySerializer.java
 ##
 @@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * A {@link TypeSerializer} for {@link BinaryArray}. It should be noted that 
the header will not be encoded.
+ * Currently Python doesn't support BinaryArray natively, so we can't use 
BaseArraySerializer in blink directly.
+ */
+@Internal
+public class BinaryArraySerializer extends BaseArraySerializer {
 
 Review comment:
   If we extends BaseArraySerializer, it seems the type parameter "K" is 
unnecessary?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353024028
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -79,6 +80,101 @@ def __repr__(self):
 return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
+class ArrayCoderImpl(StreamCoderImpl):
+
+def __init__(self, elem_coder):
+self._elem_coder = elem_coder
+
+def encode_to_stream(self, value, out_stream, nested):
+out_stream.write_bigendian_int32(len(value))
+for elem in value:
+if elem is None:
+out_stream.write_byte(False)
+else:
+out_stream.write_byte(True)
+self._elem_coder.encode_to_stream(elem, out_stream, nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+elements = [self._elem_coder.decode_from_stream(in_stream, nested)
+if not not in_stream.read_byte() else None for _ in 
range(size)]
+return elements
+
+def __repr__(self):
+return 'ArrayCoderImpl[%s]' % str(self._elem_coder)
+
+
+class MapCoderImpl(StreamCoderImpl):
+
+def __init__(self, key_coder, value_coder):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+def encode_to_stream(self, map_value, out_stream, nested):
+out_stream.write_bigendian_int32(len(map_value))
+for key in map_value:
+self._key_coder.encode_to_stream(key, out_stream, nested)
+value = map_value[key]
+if value is None:
+out_stream.write_byte(True)
+else:
+out_stream.write_byte(False)
+self._value_coder.encode_to_stream(map_value[key], out_stream, 
nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+map_value = {}
+for _ in range(size):
+key = self._key_coder.decode_from_stream(in_stream, nested)
+is_null = not not in_stream.read_byte()
+if is_null:
+map_value[key] = None
+else:
+value = self._value_coder.decode_from_stream(in_stream, nested)
+map_value[key] = value
+return map_value
+
+def __repr__(self):
+return 'MapCoderImpl[%s]' % ' : '.join([str(self._key_coder), 
str(self._value_coder)])
 
 Review comment:
   use repr() instead of str()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions

2019-12-04 Thread GitBox
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] 
Support complex data types in Python user-defined functions
URL: https://github.com/apache/flink/pull/10086#discussion_r353025966
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##
 @@ -79,6 +80,101 @@ def __repr__(self):
 return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
+class ArrayCoderImpl(StreamCoderImpl):
+
+def __init__(self, elem_coder):
+self._elem_coder = elem_coder
+
+def encode_to_stream(self, value, out_stream, nested):
+out_stream.write_bigendian_int32(len(value))
+for elem in value:
+if elem is None:
+out_stream.write_byte(False)
+else:
+out_stream.write_byte(True)
+self._elem_coder.encode_to_stream(elem, out_stream, nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+elements = [self._elem_coder.decode_from_stream(in_stream, nested)
+if not not in_stream.read_byte() else None for _ in 
range(size)]
+return elements
+
+def __repr__(self):
+return 'ArrayCoderImpl[%s]' % str(self._elem_coder)
+
+
+class MapCoderImpl(StreamCoderImpl):
+
+def __init__(self, key_coder, value_coder):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+def encode_to_stream(self, map_value, out_stream, nested):
+out_stream.write_bigendian_int32(len(map_value))
+for key in map_value:
+self._key_coder.encode_to_stream(key, out_stream, nested)
+value = map_value[key]
+if value is None:
+out_stream.write_byte(True)
+else:
+out_stream.write_byte(False)
+self._value_coder.encode_to_stream(map_value[key], out_stream, 
nested)
+
+def decode_from_stream(self, in_stream, nested):
+size = in_stream.read_bigendian_int32()
+map_value = {}
+for _ in range(size):
+key = self._key_coder.decode_from_stream(in_stream, nested)
+is_null = not not in_stream.read_byte()
+if is_null:
+map_value[key] = None
+else:
+value = self._value_coder.decode_from_stream(in_stream, nested)
+map_value[key] = value
+return map_value
+
+def __repr__(self):
+return 'MapCoderImpl[%s]' % ' : '.join([str(self._key_coder), 
str(self._value_coder)])
+
+
+class MultisetCoderImpl(StreamCoderImpl):
+
+def __init__(self, element_coder):
+self._element_coder = element_coder
+
+def encode_to_stream(self, value, out_stream, nested):
+dict_value = self.multiset_to_dict(value)
+out_stream.write_bigendian_int32(len(dict_value))
 
 Review comment:
   This part is duplicated with MapCoderImpl, can we reuse it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14567) Aggregate query with more than two group fields can't be write into HBase sink

2019-12-04 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987644#comment-16987644
 ] 

Jark Wu commented on FLINK-14567:
-

Hi [~twalthr] [~ykt836], do you mean if the key information of sink is not 
matched with query keys, then we can add a keyBy shuffle between them?  
Otherwise, they can be chained. 

We already added primary key constraint to {{TableSchema}}, so we only need to 
add a keyBy shuffle in framework if they are not matched. 

> Aggregate query with more than two group fields can't be write into HBase sink
> --
>
> Key: FLINK-14567
> URL: https://issues.apache.org/jira/browse/FLINK-14567
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase, Table SQL / Legacy Planner, Table 
> SQL / Planner
>Reporter: Jark Wu
>Priority: Critical
>
> If we have a hbase table sink with rowkey of varchar (also primary key) and a 
> column of bigint, we want to write the result of the following query into the 
> sink using upsert mode. However, it will fail when primary key check with the 
> exception "UpsertStreamTableSink requires that Table has a full primary keys 
> if it is updated."
> {code:sql}
> select concat(f0, '-', f1) as key, sum(f2)
> from T1
> group by f0, f1
> {code}
> This happens in both blink planner and old planner. That is because if the 
> query works in update mode, then there must be a primary key exist to be 
> extracted and set to {{UpsertStreamTableSink#setKeyFields}}. 
> That's why we want to derive primary key for concat in FLINK-14539, however, 
> we found that the primary key is not preserved after concating. For example, 
> if we have a primary key (f0, f1, f2) which are all varchar type, say we have 
> two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of 
> concat(f0, f1, f2) are the same, which means the concat result is not primary 
> key anymore.
> So here comes the problem, how can we proper support HBase sink or such use 
> case? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10408: [FLINK-14992][client] Add job listener to execution environments

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10408: [FLINK-14992][client] Add job 
listener to execution environments
URL: https://github.com/apache/flink/pull/10408#issuecomment-561446399
 
 
   
   ## CI report:
   
   * 26dba18b2fd51c8df67cc33eace890b5ca34e182 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256957)
   * 304ef1d6f41d2a2b29997312f3d3ced631fc2ec5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139259056)
   * e39ef8250f07fc23867ad60b24e3efb3af153cf1 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139278737)
   * 01470a38e8d7194e5309f2ca4f6f070a507ca4ca : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139281845)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10415: [FLINK-15049][table-planner-blink] Compile error when hash join with …

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10415: [FLINK-15049][table-planner-blink] 
Compile error when hash join with …
URL: https://github.com/apache/flink/pull/10415#issuecomment-561528911
 
 
   
   ## CI report:
   
   * e838831b098e3043b6576aec5ba38ee6bdc29422 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139281869)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10416: [FLINK-14484] Enable to control memory usage of RocksDB via Cache and WriteBufferManager

2019-12-04 Thread GitBox
flinkbot commented on issue #10416: [FLINK-14484] Enable to control memory 
usage of RocksDB via Cache and WriteBufferManager
URL: https://github.com/apache/flink/pull/10416#issuecomment-561539660
 
 
   
   ## CI report:
   
   * 32d4ffce9d515f19045ff5a13a9602f0b52d3196 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode

2019-12-04 Thread GitBox
zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] 
Only log RestartStrategy in legacy scheduling mode
URL: https://github.com/apache/flink/pull/10406#discussion_r353616472
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 ##
 @@ -77,6 +80,7 @@ public SchedulerNG createInstance(
jobMasterConfiguration,
jobGraph.isCheckpointingEnabled())
.create();
+   LOG.info("Using back off time strategy {} for {} ({}).", 
restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
 
 Review comment:
   > I introduced a log message here because it is closest to the object 
creation. Moreover, I prefer to [only have field 
assignments](https://www.yegor256.com/2015/05/07/ctors-must-be-code-free.html) 
in the constructor (note however, that this already does not hold in 
`DefaultScheduler`). Imo, another reasonable place to log the _scheduling 
configuration_ is in `startSchedulingInternal()`.
   
   fine. In my understanding logging is usually not considered to be real 
logics (though it does something), otherwise in some cases we might find it 
hard to print instance internal states for trouble shooting. 
   Anyway, I think it's also fine to keep the logging here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14567) Aggregate query with more than two group fields can't be write into HBase sink

2019-12-04 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987675#comment-16987675
 ] 

Kurt Young commented on FLINK-14567:


Basically yes. But this would involve the discussion about how to deal with 
primary key in source and sink, which would be the following work of FLIP-87.

> Aggregate query with more than two group fields can't be write into HBase sink
> --
>
> Key: FLINK-14567
> URL: https://issues.apache.org/jira/browse/FLINK-14567
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase, Table SQL / Legacy Planner, Table 
> SQL / Planner
>Reporter: Jark Wu
>Priority: Critical
>
> If we have a hbase table sink with rowkey of varchar (also primary key) and a 
> column of bigint, we want to write the result of the following query into the 
> sink using upsert mode. However, it will fail when primary key check with the 
> exception "UpsertStreamTableSink requires that Table has a full primary keys 
> if it is updated."
> {code:sql}
> select concat(f0, '-', f1) as key, sum(f2)
> from T1
> group by f0, f1
> {code}
> This happens in both blink planner and old planner. That is because if the 
> query works in update mode, then there must be a primary key exist to be 
> extracted and set to {{UpsertStreamTableSink#setKeyFields}}. 
> That's why we want to derive primary key for concat in FLINK-14539, however, 
> we found that the primary key is not preserved after concating. For example, 
> if we have a primary key (f0, f1, f2) which are all varchar type, say we have 
> two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of 
> concat(f0, f1, f2) are the same, which means the concat result is not primary 
> key anymore.
> So here comes the problem, how can we proper support HBase sink or such use 
> case? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement 
ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191
 
 
   
   ## CI report:
   
   * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366796)
   * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133940607)
   * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998545)
   * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010321)
   * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145261)
   * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017410)
   * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138730857)
   * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138770610)
   * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116897)
   * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256990)
   * ce01f742aba3a683b98a0ea7da47e678d30c12be : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353634011
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -136,6 +146,35 @@
return builder.build();
}
 
+   /**
+* The original table schema may contain generated columns which 
shouldn't be produced/consumed
+* by TableSource/TableSink. And the original TIMESTAMP/DATE/TIME types 
uses LocalDateTime/LocalDate/LocalTime
+* as the conversion classes, however, JDBC connector uses 
Timestamp/Date/Time classes. So that
+* we bridge them to the expected conversion classes.
+*/
+   private TableSchema getPhysicalTableSchema(DescriptorProperties 
descriptorProperties) {
 
 Review comment:
   Should there be a util method in `DescriptorProperties` or somewhere to 
re-use?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353634713
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
 ##
 @@ -119,7 +126,17 @@ private HBaseTableSchema validateTableSchema(TableSchema 
schema) {
String[] qualifierNames = 
familyType.getFieldNames();
TypeInformation[] qualifierTypes = 
familyType.getFieldTypes();
for (int j = 0; j < familyType.getArity(); j++) 
{
-   hbaseSchema.addColumn(name, 
qualifierNames[j], qualifierTypes[j].getTypeClass());
+   // HBase connector doesn't support 
LocalDateTime
+   // use Timestamp as conversion class 
for now.
+   Class clazz = 
qualifierTypes[j].getTypeClass();
+   if (LocalDateTime.class.equals(clazz)) {
+   clazz = Timestamp.class;
 
 Review comment:
   Not work for nested fields?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353634312
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -136,6 +146,35 @@
return builder.build();
}
 
+   /**
+* The original table schema may contain generated columns which 
shouldn't be produced/consumed
+* by TableSource/TableSink. And the original TIMESTAMP/DATE/TIME types 
uses LocalDateTime/LocalDate/LocalTime
+* as the conversion classes, however, JDBC connector uses 
Timestamp/Date/Time classes. So that
+* we bridge them to the expected conversion classes.
+*/
+   private TableSchema getPhysicalTableSchema(DescriptorProperties 
descriptorProperties) {
+   TableSchema schema = 
descriptorProperties.getTableSchema(SCHEMA);
+   TableSchema.Builder physicalSchemaBuilder = 
TableSchema.builder();
+   schema.getTableColumns()
+   .forEach(c -> {
+   if (!c.isGenerated()) {
+   LogicalTypeRoot root = 
c.getType().getLogicalType().getTypeRoot();
+   final DataType type;
+   if (root == 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
+   type = 
c.getType().bridgedTo(Timestamp.class);
 
 Review comment:
   But not work for nested fields? right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353628831
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
 ##
 @@ -181,6 +182,16 @@ private static void validateLogicalTypeEqualsPhysical(
TableSource tableSource) {
ResolvedField resolvedField = resolveField(fieldName, 
tableSource);
if (!resolvedField.getType().equals(logicalType)) {
+
+   if (resolvedField.getType().getLogicalType() instanceof 
LegacyTypeInformationType &&
+   logicalType.getLogicalType().getTypeRoot() == 
LogicalTypeRoot.DECIMAL &&
+   logicalType.getLogicalType().getTypeRoot() == 
resolvedField.getType().getLogicalType().getTypeRoot() &&
+   logicalType.getConversionClass() == 
resolvedField.getType().getConversionClass()) {
 
 Review comment:
   Is that logical equal is enough?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353632499
 
 

 ##
 File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ##
 @@ -340,56 +340,68 @@ private DeserializationRuntimeConverter 
createFallbackConverter(Class valueTy
return Optional.of(createTimeConverter());
} else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
return Optional.of(createTimestampConverter());
+   } else if (simpleTypeInfo == Types.LOCAL_DATE) {
+   return Optional.of(this::convertToLocalDate);
+   } else if (simpleTypeInfo == Types.LOCAL_TIME) {
+   return Optional.of(this::convertToLocalTime);
+   } else if (simpleTypeInfo == Types.LOCAL_DATE_TIME) {
+   return Optional.of(this::convertToLocalDateTime);
} else {
return Optional.empty();
}
}
 
+   private LocalDate convertToLocalDate(ObjectMapper mapper, JsonNode 
jsonNode) {
+   return 
ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+   }
+
private DeserializationRuntimeConverter createDateConverter() {
 
 Review comment:
   Can you modify `createDateConverter` to `convertToDate` style too?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353638111
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
 ##
 @@ -216,6 +216,18 @@ else if (canConvertToTimestampTypeInfoLenient(dataType)) {
return Types.SQL_TIMESTAMP;
}
 
+   // relax the precision constraint as LocalDateTime can store 
the highest precision
+   else if (hasRoot(logicalType, 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
 
 Review comment:
   Maybe you can take a look to above, there are some comments and limitation 
for `java.sql.Timestamp`.
   And others... If you want to modify, maybe you should modify all or just 
respect their rules. (Like in `canConvertToTimestampTypeInfoLenient`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353625420
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 ##
 @@ -582,19 +607,26 @@ public short getShort(String key) {
final TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i = 0; i < fieldCount; i++) {
final String nameKey = key + '.' + i + '.' + 
TABLE_SCHEMA_NAME;
-   final String typeKey = key + '.' + i + '.' + 
TABLE_SCHEMA_TYPE;
+   final String legacyTypeKey = key + '.' + i + '.' + 
TABLE_SCHEMA_TYPE;
+   final String typeKey = key + '.' + i + '.' + 
TABLE_SCHEMA_DATATYPE;
final String exprKey = key + '.' + i + '.' + 
TABLE_SCHEMA_EXPR;
 
final String name = 
optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey));
 
-   final TypeInformation type = optionalGet(typeKey)
-   .map(TypeStringUtils::readTypeInfo)
-   .orElseThrow(exceptionSupplier(typeKey));
+   final DataType type;
+   if (containsKey(typeKey)) {
 
 Review comment:
   NIT: use `? :`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-04 Thread GitBox
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r353625137
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 ##
 @@ -582,19 +607,26 @@ public short getShort(String key) {
final TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i = 0; i < fieldCount; i++) {
final String nameKey = key + '.' + i + '.' + 
TABLE_SCHEMA_NAME;
-   final String typeKey = key + '.' + i + '.' + 
TABLE_SCHEMA_TYPE;
+   final String legacyTypeKey = key + '.' + i + '.' + 
TABLE_SCHEMA_TYPE;
+   final String typeKey = key + '.' + i + '.' + 
TABLE_SCHEMA_DATATYPE;
final String exprKey = key + '.' + i + '.' + 
TABLE_SCHEMA_EXPR;
 
final String name = 
optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey));
 
-   final TypeInformation type = optionalGet(typeKey)
-   .map(TypeStringUtils::readTypeInfo)
-   .orElseThrow(exceptionSupplier(typeKey));
+   final DataType type;
+   if (containsKey(typeKey)) {
+   type = getDataType(typeKey);
+   } else if (containsKey(legacyTypeKey)) {
+   type = 
LegacyTypeInfoDataTypeConverter.toDataType(getType(legacyTypeKey));
 
 Review comment:
   Use `TypeConversions.***`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement 
ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191
 
 
   
   ## CI report:
   
   * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366796)
   * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133940607)
   * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998545)
   * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010321)
   * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145261)
   * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017410)
   * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138730857)
   * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138770610)
   * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116897)
   * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256990)
   * ce01f742aba3a683b98a0ea7da47e678d30c12be : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139290769)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9965: [FLINK-10935][kubernetes]Implement KubeClient with Faric8 Kubernetes clients

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #9965: [FLINK-10935][kubernetes]Implement 
KubeClient with Faric8 Kubernetes clients
URL: https://github.com/apache/flink/pull/9965#issuecomment-544813931
 
 
   
   ## CI report:
   
   * 6f90b457e56a0a8cb45d63c1b05b47d2e38030a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/132938440)
   * 86aa5ce8f77faf233c51a7231b3f71e518fd6c92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/132962300)
   * 7851d845a43f799627b2c788ace8eb7e6caccb03 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133133283)
   * d49bb0a622e1667baffd29f19fdcc60d0022fe82 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366914)
   * 0e88e0e5be77f450c82cbc460ea4f02a1effc920 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998649)
   * 9afcc7ba840186c68f36b30d6b28b8c1cbf09b61 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145224)
   * 06e6b2bee4c1788b150f2b83c43eb4723709864b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017584)
   * 816a11afe713e736cdfd2eb566762ee2addf7071 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138708643)
   * e80691082c9fc8ac704b0bccaa5180b7de0718d5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138770607)
   * e85e76c018b7606381c5869e0c1054e02c4a2321 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138957695)
   * d1c41ae28960b3eea21fc662e771fe44c1c53b12 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139101718)
   * 1e54eb3124deaca01afd2a3d28edbdd8086db301 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139290748)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for equality to NaN

2019-12-04 Thread GitBox
flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for 
equality to NaN
URL: https://github.com/apache/flink/pull/10333#issuecomment-559012637
 
 
   
   ## CI report:
   
   * c5bd9898fbbeb3d668051475409b52bfb1e2 : UNKNOWN
   * 44f202263cbb26473b06ea19f293bf0b6afce8e6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138375519)
   * e0ea169fab74ca07617a056e1c60861938ad6f0b : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139122432)
   * 38bef99d2f337b556c87141e0240ddd7075fd980 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139296161)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString

2019-12-04 Thread GitBox
StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster 
implementation of StringValue writeString and readString
URL: https://github.com/apache/flink/pull/10358#issuecomment-561582129
 
 
   Some thoughts for follow-up:
   
 - Do you know where exactly the performance difference comes from? Is it 
mainly the many individual `read()` single byte operations, that get more 
efficient if you bulk get into a byte array?
   
 - We can actually break the serialization format, as long as we change the 
StringSerializer config snapshot and restore methods. We can return "needs 
conversion" as the compatibility and return a serializer with the old encoding 
as the restore serializer.
 - Thinking twice, the above would only work if all parts support 
serializer evoluation, and I think keys in RocksDB cannot be evolved right now 
(not yet implemented).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #10238: [FLINK-8949] Add dedicated watermarks metric retrieval endpoint

2019-12-04 Thread GitBox
zentol commented on a change in pull request #10238: [FLINK-8949] Add dedicated 
watermarks metric retrieval endpoint
URL: https://github.com/apache/flink/pull/10238#discussion_r353664553
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandler.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractJobVertexHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobVertexWatermarksHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+
+/**
+ * Handler that returns the watermarks given a {@link JobID} and {@link 
JobVertexID}.
+ */
+public class JobVertexWatermarksHandler extends 
AbstractJobVertexHandler {
+
+   private final MetricFetcher metricFetcher;
+
+   public JobVertexWatermarksHandler(
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MetricFetcher metricFetcher,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor) {
+   super(leaderRetriever,
+   timeout,
+   responseHeaders,
+   JobVertexWatermarksHeaders.INSTANCE,
+   executionGraphCache,
+   executor);
+   this.metricFetcher = metricFetcher;
+   }
+
+   @Override
+   protected MetricCollectionResponseBody handleRequest(
+   HandlerRequest request,
+   AccessExecutionJobVertex jobVertex) throws 
RestHandlerException {
+
+   String jobID = 
request.getPathParameter(JobIDPathParameter.class).toString();
+   String taskID = jobVertex.getJobVertexId().toString();
+
+   metricFetcher.update();
+   MetricStore.TaskMetricStore taskMetricStore = 
metricFetcher.getMetricStore().getTaskMetricStore(jobID, taskID);
+   if (taskMetricStore == null) {
+   return new 
MetricCollectionResponseBody(Collections.emptyList());
+   }
+
+   AccessExecutionVertex[] taskVertices = 
jobVertex.getTaskVertices();
+   List metrics = new ArrayList<>(taskVertices.length);
+
+   for (AccessExecutionVertex taskVertex : taskVertices) {
+   String id = taskVertex.getParallelSubtaskIndex() + "." 
+ MetricNames.IO_CURRENT_INPUT_WATERMARK;
+   String watermarkValue = taskMetricStore.getMetric(id);
+   if (watermarkValue != null) {
+   metrics.add(new Metric(id, watermarkValue));
+   }
+   }
+
+   return 

[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353667305
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -397,7 +406,8 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
nativeMetricOptions,
metricGroup,
restoreStateHandles,
-   ttlCompactFiltersManager);
+   ttlCompactFiltersManager,
+   writeBatchSize);
 
 Review comment:
   Can we use `this.writeBatchSize` directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353670935
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
 ##
 @@ -119,6 +127,8 @@ public RocksDBFullRestoreOperation(
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative.");
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353669969
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -49,22 +53,35 @@
 
private final int capacity;
 
-   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB) {
-   this(rocksDB, null, 500);
+   @Nonnegative
+   private final long batchSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long 
writeBatchSize) {
+   this(rocksDB, null, 500, writeBatchSize);
}
 
public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options) {
-   this(rocksDB, options, 500);
+   this(rocksDB, options, 500, DEFAULT_BATCH_SIZE);
+   }
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options, long batchSize) {
+   this(rocksDB, options, 500, batchSize);
}
 
-   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options, int capacity) {
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options, int capacity, long batchSize) {
Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
"capacity should be between " + MIN_CAPACITY + " and " 
+ MAX_CAPACITY);
+   Preconditions.checkArgument(batchSize >= 0, "Max batch size 
have to be no negative.");
 
 Review comment:
   We can use `@Nonnegative` for all function parameters instead of 
`checkArgument`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353662612
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -131,7 +135,8 @@ public RocksDBKeyedStateBackendBuilder(
MetricGroup metricGroup,
@Nonnull Collection stateHandles,
StreamCompressionDecorator keyGroupCompressionDecorator,
-   CloseableRegistry cancelStreamRegistry) {
+   CloseableRegistry cancelStreamRegistry,
+   @Nonnegative long writeBatchSize) {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353663839
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = kvStateInformation;
 
this.writeOptions = new WriteOptions().setDisableWAL(true);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
 
 Review comment:
   I think we don't need to `checkArgument` when the passed by argument is 
annotated as `@Nonnegative` since this is already a contract.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353672881
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ##
 @@ -65,4 +69,52 @@ public void basicTest() throws Exception {
}
}
}
+
+   /**
+* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory 
consumed exceeds the preconfigured value.
+*/
+   @Test
+   public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws 
Exception {
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200, 50)) {
+   // sequence (8 bytes) + count (4 bytes)
+   // more information please ref to write_batch.cc in 
RocksDB
+   assertEquals(12, writeBatchWrapper.getDataSize());
 
 Review comment:
   This assertion might fail if the rocksdb implementation changes and is 
unnecessary. We could simply record an `initialSize` for 
`writeBatchWrapper.getDataSize()` and check against it after flush is triggered.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-04 Thread GitBox
wsry commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r353673439
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 ##
 @@ -217,4 +217,14 @@
 * @return self as ByteBuf implementation.
 */
ByteBuf asByteBuf();
+
+   /**
+* @return whether the buffer is compressed or not.
+*/
+   boolean isCompressed();
+
+   /**
+* Tags the buffer as compressed or uncompressed.
+*/
+   void setCompressed(boolean isCompressed);
 
 Review comment:
   We need the compression tag to identify if the buffer is compressed at 
downstream for the downstream may receive a mixture of compressed and 
uncompressed buffer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353673315
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ##
 @@ -65,4 +69,52 @@ public void basicTest() throws Exception {
}
}
}
+
+   /**
+* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory 
consumed exceeds the preconfigured value.
+*/
+   @Test
+   public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws 
Exception {
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200, 50)) {
+   // sequence (8 bytes) + count (4 bytes)
+   // more information please ref to write_batch.cc in 
RocksDB
+   assertEquals(12, writeBatchWrapper.getDataSize());
+   byte[] dummy = new byte[6];
+   ThreadLocalRandom.current().nextBytes(dummy);
+   // will add 1 + 1 + 1 + 6 + 1 + 6 = 16 bytes for each KV
+   // format is 
[handleType|kvType|keyLen|key|valueLen|value]
+   // more information please ref write_batch.cc in RocksDB
+   writeBatchWrapper.put(handle, dummy, dummy);
+   assertEquals(28, writeBatchWrapper.getDataSize());
+   writeBatchWrapper.put(handle, dummy, dummy);
+   assertEquals(44, writeBatchWrapper.getDataSize());
+   writeBatchWrapper.put(handle, dummy, dummy);
+   // will flush all, then an empty write batch
+   assertEquals(12, writeBatchWrapper.getDataSize());
+   }
+   }
+
+   /**
+* Tests that {@link RocksDBWriteBatchWrapper} flushes after the kv 
count exceeds the preconfigured value.
+*/
+   @Test
+   public void testWriteBatchWrapperFlushAfterCountExceed() throws 
Exception {
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 100, 5)) {
+   byte[] dummy = new byte[2];
+   ThreadLocalRandom.current().nextBytes(dummy);
+   for (int i = 1; i < 100; ++i) {
+   writeBatchWrapper.put(handle, dummy, dummy);
+   // init 12 bytes, each kv consumes 8 bytes
+   assertEquals(12 + 8 * i, 
writeBatchWrapper.getDataSize());
 
 Review comment:
   Ditto. Recording the init size of empty `writeBatchWrapper` instead using 
the hard-coded value `12`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353671065
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 ##
 @@ -125,6 +129,8 @@ public RocksDBIncrementalRestoreOperation(
this.restoredSstFiles = new TreeMap<>();
this.lastCompletedCheckpointId = -1L;
this.backendUID = UUID.randomUUID();
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative.");
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353669265
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -166,6 +169,11 @@
/** Whether we already lazily initialized our local storage 
directories. */
private transient boolean isInitialized;
 
+   /**
+* Max consumed memory size for one batch in {@link 
RocksDBWriteBatchWrapper}, default value 2mb.
+*/
+   private long batchSize;
 
 Review comment:
   Suggest to change all "batchSize/BATCH_SIZE" to 
"writeBatchSize/WRITE_BATCH_SIZE" in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353667120
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -414,7 +424,8 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
nativeMetricOptions,
metricGroup,
restoreStateHandles,
-   ttlCompactFiltersManager);
+   ttlCompactFiltersManager,
+   writeBatchSize);
 
 Review comment:
   Can we use `this.writeBatchSize` directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353670772
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -113,4 +126,16 @@ public void close() throws RocksDBException {
}
IOUtils.closeQuietly(batch);
}
+
+   private void flushIfNeeded() throws RocksDBException {
+   boolean needFlush = batch.count() == capacity || (batchSize > 0 
&& batch.getDataSize() >= batchSize);
 
 Review comment:
   No need to check whether `batchSize` is larger than 0 since we already have 
the `@Nonnegative` annotation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator

2019-12-04 Thread GitBox
StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up 
TpcdsResultComparator
URL: https://github.com/apache/flink/pull/10405#issuecomment-561599666
 
 
   Hmmm, cannot reproduce it any more.
   There seem to have been some errors in the log files, but apparently not a 
deterministic error.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator

2019-12-04 Thread GitBox
StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up 
TpcdsResultComparator
URL: https://github.com/apache/flink/pull/10405#issuecomment-561599795
 
 
   Will merge this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen edited a comment on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator

2019-12-04 Thread GitBox
StephanEwen edited a comment on issue #10405: [FLINK-15044][e2e tests] Clean up 
TpcdsResultComparator
URL: https://github.com/apache/flink/pull/10405#issuecomment-561599666
 
 
   Hmmm, cannot reproduce it any more.
   There seem to have been some errors in the log files, but apparently not a 
deterministic error.
   
   Must also be unrelated to this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   8   9   10   >