[GitHub] [flink] JingsongLi commented on issue #10415: [FLINK-15049][table-planner-blink] Compile error when hash join with …
JingsongLi commented on issue #10415: [FLINK-15049][table-planner-blink] Compile error when hash join with … URL: https://github.com/apache/flink/pull/10415#issuecomment-561533551 Thanks @docete for you fixing, LGTM after travis passed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shuttie commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
shuttie commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#discussion_r353607833 ## File path: flink-core/src/main/java/org/apache/flink/types/StringValue.java ## @@ -759,56 +761,142 @@ public static String readString(DataInput in) throws IOException { } len |= curr << shift; } - + // subtract one for the null length len -= 1; - - final char[] data = new char[len]; - for (int i = 0; i < len; i++) { - int c = in.readUnsignedByte(); - if (c < HIGH_BIT) { - data[i] = (char) c; - } else { + /* as we have no idea about byte-length of the serialized string, we cannot fully +* read it into memory buffer. But we can do it in an optimistic way: +* 1. In a happy case when the string is an us-ascii one, then byte_len == char_len +* 2. If we spot at least one character with code >= 127, then we reallocate the buffer +* to accommodate for the next characters. +*/ + + // happily assume that the string is an 7 bit us-ascii one + byte[] buf = new byte[len]; + in.readFully(buf); + + final char[] data = new char[len]; + int charPosition = 0; + int bufSize = len; + int bytePosition = 0; + + while (charPosition < len) { + // there is at least `char count - char position` bytes left in case if all the + // remaining characters are 7 bit. + int remainingBytesEstimation = len - charPosition; Review comment: A nice catch. This variable only used at the buffer refill operation, so it's reasonable not to compute it for every character. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 9.0
StephanEwen commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 9.0 URL: https://github.com/apache/flink/pull/10363#issuecomment-561535237 Do we need to adjust any NOTICE files due to the changed dependency versions we get, or is this automatically merged from flink-shaded? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14572) BlobsCleanupITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987660#comment-16987660 ] Gary Yao commented on FLINK-14572: -- Unfortunately I don't have any either. > BlobsCleanupITCase failed on Travis > --- > > Key: FLINK-14572 > URL: https://issues.apache.org/jira/browse/FLINK-14572 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Assignee: Yun Gao >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > {noformat} > java.lang.AssertionError: > Expected: is > but: was > at > org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220) > at > org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133) > {noformat} > https://api.travis-ci.com/v3/job/250445874/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] AHeise commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
AHeise commented on a change in pull request #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#discussion_r353626201 ## File path: flink-core/src/test/java/org/apache/flink/types/StringSerializationTest.java ## @@ -56,6 +56,27 @@ public void testNonNullValues() { fail("Exception in test: " + e.getMessage()); } } + + @Test + public void testUnicodeValues() { + try { Review comment: Ah then leave it and we will eventually clean up on our end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15016) Remove unused dependency
[ https://issues.apache.org/jira/browse/FLINK-15016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-15016: - Fix Version/s: 1.10.0 > Remove unused dependency > > > Key: FLINK-15016 > URL: https://issues.apache.org/jira/browse/FLINK-15016 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: César Soto Valero >Assignee: César Soto Valero >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Dependency *commons-io:commons-io* is declared in module *flink-core*. > However, this dependency is not used and, therefore, should be removed to > make the pom clearer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10330: [FLINK-14189][runtime] Extend TaskExecutor to support dynamic slot allocation
flinkbot edited a comment on issue #10330: [FLINK-14189][runtime] Extend TaskExecutor to support dynamic slot allocation URL: https://github.com/apache/flink/pull/10330#issuecomment-558970102 ## CI report: * 38f2c1c450122cae9aa99258c19c6d07e998 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138360699) * 14ecf374fe3f6587e6eee29a39bf15190fac269c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138375450) * 761359a2f2509a06483e422e5e592b66e2e5661a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138401670) * 88630bcdfd9c05fd352a422a88e59f26fba4dc7c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138455828) * 726fc5733c8ac35bdb69becbae13911d8e91dd07 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138990518) * c54d7b94b9ca46e6b8dc2a072919ab0b1deed022 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor…
flinkbot commented on issue #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor… URL: https://github.com/apache/flink/pull/10418#issuecomment-561557852 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 6951e77dff9c9c4fb1945d79ef4253e5ea2c0f0f (Wed Dec 04 09:40:28 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15050).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] Add missing test harnesses for broadcast functions
aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] Add missing test harnesses for broadcast functions URL: https://github.com/apache/flink/pull/10286#discussion_r353637844 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/BroadcastOperatorTestHarness.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator; + +/** + * A test harness for testing a {@link CoBroadcastWithNonKeyedOperator}. Review comment: I would relax this constraint and maybe say `A test harness for testing a {@link TwoInputStreamOperator} in a broadcast context.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] Add missing test harnesses for broadcast functions
aljoscha commented on a change in pull request #10286: [FLINK-14890] [tests] Add missing test harnesses for broadcast functions URL: https://github.com/apache/flink/pull/10286#discussion_r353638159 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator; + + +/** + * A test harness for testing a {@link CoBroadcastWithKeyedOperator}. Review comment: Same as for the other operator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor…
JingsongLi commented on a change in pull request #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor… URL: https://github.com/apache/flink/pull/10418#discussion_r353640867 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java ## @@ -281,6 +288,28 @@ public static DataFormatConverter getConverterForDataType(DataType originDataTyp return ps; } + private static int getDateTimePrecision(LogicalType logicalType) { + if (logicalType instanceof TimestampType) { + TimestampType dt = (TimestampType) logicalType; + return dt.getPrecision(); + } else { + TypeInformation typeInfo = ((LegacyTypeInformationType) logicalType).getTypeInformation(); + if (typeInfo instanceof LegacyTimestampTypeInfo) { + LegacyTimestampTypeInfo dt = (LegacyTimestampTypeInfo) typeInfo; + return dt.getPrecision(); + } else if (typeInfo instanceof LegacyLocalDateTimeTypeInfo) { + LegacyLocalDateTimeTypeInfo dt = (LegacyLocalDateTimeTypeInfo) typeInfo; + return dt.getPrecision(); + } else if (typeInfo instanceof SqlTimeTypeInfo) { + return 3; Review comment: Let's remove `SqlTimeTypeInfo` and `LocalTimeTypeInfo`, precision 3 is a temporary solution, and should not reach here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-15047) YarnDistributedCacheITCase is unstable
[ https://issues.apache.org/jira/browse/FLINK-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987684#comment-16987684 ] Gary Yao edited comment on FLINK-15047 at 12/4/19 9:57 AM: --- -Running this test locally failed 10 out of 10 times. I am promoting the priority to Blocker.- Edit: I forgot to recompile Flink so I am not sure if it fails 10/10 times. was (Author: gjy): Running this test locally failed 10 out of 10 times. I am promoting the priority to Blocker. > YarnDistributedCacheITCase is unstable > -- > > Key: FLINK-15047 > URL: https://issues.apache.org/jira/browse/FLINK-15047 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Zili Chen >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > See also https://api.travis-ci.com/v3/job/262854881/log.txt > cc [~ZhenqiuHuang] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for equality to NaN
flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for equality to NaN URL: https://github.com/apache/flink/pull/10333#issuecomment-559012637 ## CI report: * c5bd9898fbbeb3d668051475409b52bfb1e2 : UNKNOWN * 44f202263cbb26473b06ea19f293bf0b6afce8e6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138375519) * e0ea169fab74ca07617a056e1c60861938ad6f0b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139122432) * 38bef99d2f337b556c87141e0240ddd7075fd980 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjuwangg commented on issue #10419: [FLINK-15026][SQL-CLIENT]Support create/drop/alter database in sql client
zjuwangg commented on issue #10419: [FLINK-15026][SQL-CLIENT]Support create/drop/alter database in sql client URL: https://github.com/apache/flink/pull/10419#issuecomment-561570095 @danny0405 @xuefuz @bowenli86 @KurtYoung to have a review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjuwangg opened a new pull request #10419: [FLINK-15026][SQL-CLIENT]Support create/drop/alter database in sql client
zjuwangg opened a new pull request #10419: [FLINK-15026][SQL-CLIENT]Support create/drop/alter database in sql client URL: https://github.com/apache/flink/pull/10419 ## What is the purpose of the change *Since we have support create/drop/alter database operation in TableEnvironment, it's natural and easy to hook up such operation in sql client too. This PR aims to bridge that.* ## Brief change log - [0433794](https://github.com/apache/flink/commit/043379493823c0ac38c05d355471079d6808d16a) Support create/drop/alter database in sql client ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for SqlParserCommand in SqlCommandParserTest#testCommands* - *Added test that validates create database in LocalExecutorITCase#testCreateDatabase* - *Added test that validates drop database in LocalExecutorITCase#testDropDatabase* - *Added test that validates alter database in LocalExecutorITCase#alterDatabase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions
flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions URL: https://github.com/apache/flink/pull/10345#issuecomment-559421343 ## CI report: * 5206399001512006f4b3d7663e7b5be8ea02a4a2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138541983) * cb4089dec82717cdc6ad6e78b2dbf4d0b03e57d4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138580753) * 18e1d269e688e6f39fd02cf409316776b24e8601 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138606098) * 8924109bc101b39c6057d44aa14224cc12215b7c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138625034) * 1922068e5dd92138fb2cd37c225ae0e5c6a5284f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138628834) * b1bd81bf936adb02f353374ebe9fd9c861cd2fe4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138801818) * 7499db309b268acd96ec423d629bfa633c3f150d : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/138824944) * a6cb2444a7532c4a5eadcab5da395de3a90f9474 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138828305) * da403e52ec9b8727f3002c14b67453722e06bf2b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138854348) * 0401e51b6babb79ceff3898c23d8d9136d584861 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138861032) * 8f6083ad4e48706912edb85bce1d116b37bc6d67 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138864136) * f0797b08be8f53f68da3016f77092b38bf612ee5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138947296) * e3bde3bb95123b295ff2b60139b3f774b9deb62a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138967214) * 5ab5c44b979077edc8d5a1b8c295814ec6ebb6eb : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139122458) * e09e524aca2fd0976ab296de8d0d012f08f53294 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139168737) * 7b1a700a18d027f3d618233ce639bcb9ec08396e : UNKNOWN * 590f7e632a00a49fb5727d06537ee89a613049d7 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139218625) * 8e1406ed035a978468818c1923d3dbc0a21a49ca : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139227312) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#issuecomment-559765475 ## CI report: * b106e8b0327fb78ebac1894a8f8fa51718b3bbba : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138698185) * 5b428f7f59c8b2ec3c2751f642de5b343914580b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139147374) * 622362f4f1a1240aa8ea5470cdc9364cf136f75c : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
StephanEwen commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353679918 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend( this.kvStateInformation = kvStateInformation; this.writeOptions = new WriteOptions().setDisableWAL(true); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); Review comment: I think the annotation is only really evaluated in IntelliJ by the inspections. It does not prevent anything during runtime, so this extra check does not hurt, in my opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10394: [FLINK-14663]Distinguish catalogColumnStats' unknown value and real values
flinkbot edited a comment on issue #10394: [FLINK-14663]Distinguish catalogColumnStats' unknown value and real values URL: https://github.com/apache/flink/pull/10394#issuecomment-561095403 ## CI report: * 68e77528300eadcbeaa62372d676b01e268c39a8 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116845) * 060aaebb68bab8fcd7054c782a3cd3c2b2b89ec2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139259032) * 4293b717dcc110d76748c4d5fff2022ec0ff01f2 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions
rkhachatryan commented on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions URL: https://github.com/apache/flink/pull/10345#issuecomment-561605260 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions
flinkbot edited a comment on issue #10345: [FLINK-12484][runtime] synchronize all mailbox actions URL: https://github.com/apache/flink/pull/10345#issuecomment-559421343 ## CI report: * 5206399001512006f4b3d7663e7b5be8ea02a4a2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138541983) * cb4089dec82717cdc6ad6e78b2dbf4d0b03e57d4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138580753) * 18e1d269e688e6f39fd02cf409316776b24e8601 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138606098) * 8924109bc101b39c6057d44aa14224cc12215b7c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138625034) * 1922068e5dd92138fb2cd37c225ae0e5c6a5284f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138628834) * b1bd81bf936adb02f353374ebe9fd9c861cd2fe4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138801818) * 7499db309b268acd96ec423d629bfa633c3f150d : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/138824944) * a6cb2444a7532c4a5eadcab5da395de3a90f9474 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138828305) * da403e52ec9b8727f3002c14b67453722e06bf2b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138854348) * 0401e51b6babb79ceff3898c23d8d9136d584861 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138861032) * 8f6083ad4e48706912edb85bce1d116b37bc6d67 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138864136) * f0797b08be8f53f68da3016f77092b38bf612ee5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138947296) * e3bde3bb95123b295ff2b60139b3f774b9deb62a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138967214) * 5ab5c44b979077edc8d5a1b8c295814ec6ebb6eb : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139122458) * e09e524aca2fd0976ab296de8d0d012f08f53294 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139168737) * 7b1a700a18d027f3d618233ce639bcb9ec08396e : UNKNOWN * 590f7e632a00a49fb5727d06537ee89a613049d7 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139218625) * 8e1406ed035a978468818c1923d3dbc0a21a49ca : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139227312) * 6cfe76bac6ba0318f73484ff8fcbe63d21e1c393 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
flinkbot edited a comment on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#issuecomment-559765475 ## CI report: * b106e8b0327fb78ebac1894a8f8fa51718b3bbba : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138698185) * 5b428f7f59c8b2ec3c2751f642de5b343914580b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139147374) * 622362f4f1a1240aa8ea5470cdc9364cf136f75c : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139302203) * 4396fe5a2ca35cf55502e72ab70c4ac8f1eecc29 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10359: [FLINK-14813][metrics] Provide `isBackPressured` Task metric
flinkbot edited a comment on issue #10359: [FLINK-14813][metrics] Provide `isBackPressured` Task metric URL: https://github.com/apache/flink/pull/10359#issuecomment-559773427 ## CI report: * b05689f09ee9e5eaa9ceacc223a9027b467143a6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138701358) * 7257acd25d0922ca098d64560d518c4810042085 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139263373) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#discussion_r353594121 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.compression.BlockCompressionFactory; +import org.apache.flink.runtime.io.compression.BlockCompressor; + +import java.nio.ByteBuffer; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Compressor for {@link Buffer}. + */ +public class BufferCompressor { + + /** The backing block compressor for data compression. */ + private final BlockCompressor blockCompressor; + + /** The intermediate heap buffer for the compressed data. */ + private final byte[] heapBuffer; + + public BufferCompressor(int bufferSize, String factoryName) { + checkArgument(bufferSize > 0); + // the size of this intermediate heap buffer will be gotten from the + // plugin configuration in the future, and currently, double size of + // the input buffer is enough for lz4-java compression library. + this.heapBuffer = new byte[2 * bufferSize]; + this.blockCompressor = BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor(); + } + + /** +* Compresses the given {@link Buffer} using {@link BlockCompressor}. The compressed data will be stored in the +* internal heap buffer of this {@link BufferCompressor} and returned to the caller. The caller must guarantee +* that the returned {@link Buffer} is freed when calling the method next time. +* +* Notes that the compression will always start from offset 0 to the size of the input {@link Buffer}. +*/ + public Buffer compressToInternalBuffer(Buffer buffer) { + int compressedLen; + if ((compressedLen = compress(buffer)) == 0) { + return buffer; + } + + try { + if (compressedLen >= buffer.getSize()) { + return buffer; + } + + // warp the internal heap buffer as Buffer + MemorySegment memorySegment = MemorySegmentFactory.wrap(heapBuffer); + NetworkBuffer compressedBuffer = new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE); + compressedBuffer.setSize(compressedLen); + compressedBuffer.setCompressed(true); + + return compressedBuffer; + } catch (Throwable throwable) { + return buffer; + } + } + + /** +* The difference between this method and {@link #compressToInternalBuffer(Buffer)} is that this method will +* copy the compressed data back to the input {@link Buffer} starting from offset 0. +* +* The caller must guarantee that the input {@link Buffer} is writable and there's enough space left. +*/ + public Buffer compressInPlace(Buffer buffer) { Review comment: The same comments I mentioned for above `compressToInternalBuffer`. Merge condition and remove `catch` clause. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#discussion_r353594121 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.compression.BlockCompressionFactory; +import org.apache.flink.runtime.io.compression.BlockCompressor; + +import java.nio.ByteBuffer; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Compressor for {@link Buffer}. + */ +public class BufferCompressor { + + /** The backing block compressor for data compression. */ + private final BlockCompressor blockCompressor; + + /** The intermediate heap buffer for the compressed data. */ + private final byte[] heapBuffer; + + public BufferCompressor(int bufferSize, String factoryName) { + checkArgument(bufferSize > 0); + // the size of this intermediate heap buffer will be gotten from the + // plugin configuration in the future, and currently, double size of + // the input buffer is enough for lz4-java compression library. + this.heapBuffer = new byte[2 * bufferSize]; + this.blockCompressor = BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor(); + } + + /** +* Compresses the given {@link Buffer} using {@link BlockCompressor}. The compressed data will be stored in the +* internal heap buffer of this {@link BufferCompressor} and returned to the caller. The caller must guarantee +* that the returned {@link Buffer} is freed when calling the method next time. +* +* Notes that the compression will always start from offset 0 to the size of the input {@link Buffer}. +*/ + public Buffer compressToInternalBuffer(Buffer buffer) { + int compressedLen; + if ((compressedLen = compress(buffer)) == 0) { + return buffer; + } + + try { + if (compressedLen >= buffer.getSize()) { + return buffer; + } + + // warp the internal heap buffer as Buffer + MemorySegment memorySegment = MemorySegmentFactory.wrap(heapBuffer); + NetworkBuffer compressedBuffer = new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE); + compressedBuffer.setSize(compressedLen); + compressedBuffer.setCompressed(true); + + return compressedBuffer; + } catch (Throwable throwable) { + return buffer; + } + } + + /** +* The difference between this method and {@link #compressToInternalBuffer(Buffer)} is that this method will +* copy the compressed data back to the input {@link Buffer} starting from offset 0. +* +* The caller must guarantee that the input {@link Buffer} is writable and there's enough space left. +*/ + public Buffer compressInPlace(Buffer buffer) { Review comment: The same comments I mentioned for above `compressToInternalBuffer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115 ## CI report: * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135875925) * c4b4f4d5c88a1a5009325a6260cf2d91ed69ca96 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135885120) * 2d5269d2498d96550682d113d61382b7a9ac9721 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135902960) * 5ee8701f76b9e6f2dcb451eb988371bea3b0a38d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/136486584) * 2c52d7157f5e1b25dfaa00fe50cf7b04e7d6a97e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/136497720) * 2d734eeff7480adc2ea1f3695f31ba5a169f3a05 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/136501966) * 4edae43ff7eaf0357f5e8604b02b88749c8d153f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/136522752) * 927a11838172fe792636923e9378677f92a48b73 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138028348) * a73de7a3fc63fe2d2a9bd12e03efb45bfcbf9ca8 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138455883) * d06a271e355a36fd316f35d98e2905df8829273a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138768094) * 34c5662c256c22dbb3b770a3203090b19615d338 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138984403) * fb7c23c89663e70842341264ac35d3160ee11e6f : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139087272) * b083c68650acd6b097dc7049276df080196db26f : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139278832) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14572) BlobsCleanupITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-14572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987647#comment-16987647 ] Zhu Zhu commented on FLINK-14572: - Unfortunately, the detailed logs downloaded are removed already from my local machine. :( > BlobsCleanupITCase failed on Travis > --- > > Key: FLINK-14572 > URL: https://issues.apache.org/jira/browse/FLINK-14572 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Assignee: Yun Gao >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > {noformat} > java.lang.AssertionError: > Expected: is > but: was > at > org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanup(BlobsCleanupITCase.java:220) > at > org.apache.flink.runtime.jobmanager.BlobsCleanupITCase.testBlobServerCleanupFinishedJob(BlobsCleanupITCase.java:133) > {noformat} > https://api.travis-ci.com/v3/job/250445874/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to execution environments
TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to execution environments URL: https://github.com/apache/flink/pull/10408#issuecomment-561541292 @aljoscha it would make sense. One concern is about the close actions. For some implementations of JobClient, specifically, anon subclasses of ClusterClientJobClientAdapter, we close some internal resource such as the MiniCluster or remove a shutdown hook. It should not be the case for that duplicated one. The duplicated one should only close , for our specific implementation, rest client & ha services & execution service held by ClusterClient. I have an idea for doing so, let me push a follow-up commit and give you an impression. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode
GJL commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode URL: https://github.com/apache/flink/pull/10406#discussion_r353620416 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -56,20 +56,20 @@ private RestartBackoffTimeStrategyFactoryLoader() { * * @param jobRestartStrategyConfiguration restart configuration given within the job graph * @param clusterConfiguration cluster(server-side) configuration -* @param isCheckpointingEnabled if checkpointing is enabled for the job +* @param checkpointingEnabled if checkpointing is enabled for the job * @return new version restart strategy factory */ public static RestartBackoffTimeStrategy.Factory createRestartBackoffTimeStrategyFactory( final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, final Configuration clusterConfiguration, - final boolean isCheckpointingEnabled) { + final boolean checkpointingEnabled) { Review comment: For consistency I would prefer having the same naming conventions for boolean setters and static methods. However, since there is no rule in the Flink coding style guidelines, and we are already using both styles [1][2], I'd be also fine to drop this commit. [1] https://github.com/apache/flink/blob/86c232437f74576fdee3baad11c58f77714269fc/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L112 [2] https://github.com/apache/flink/blob/86c232437f74576fdee3baad11c58f77714269fc/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java#L206 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #10402: [FLINK-15034] Bump FRocksDB version for memory control
Myasuka commented on issue #10402: [FLINK-15034] Bump FRocksDB version for memory control URL: https://github.com/apache/flink/pull/10402#issuecomment-561550622 @StephanEwen thanks for your examination, already updated `flink-dist/src/main/resources/META-INF/NOTICE` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys opened a new pull request #10417: [FLINK-14910][datastream] Checking for auto generated uids only for PhysicalStreamTransformations
dawidwys opened a new pull request #10417: [FLINK-14910][datastream] Checking for auto generated uids only for PhysicalStreamTransformations URL: https://github.com/apache/flink/pull/10417 ## What is the purpose of the change This PR enables the check for auto generated uids only for `PhysicalStreamTransformations`, as those are the only `StreamTransformations` that will produce a `StreamOperator` that can have a state. This solves the problem that it is not possible to assign the uid for some of the non-physical transformations (e.g `keyBy`, `split`). ## Verifying this change Added: `org.apache.flink.streaming.graph.StreamingJobGraphGeneratorNodeHashTest#testDisablingAutoUidsWorksWithKeyBy` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12283) In Table API, allow non-static inner class as UDF
[ https://issues.apache.org/jira/browse/FLINK-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-12283: - Summary: In Table API, allow non-static inner class as UDF (was: Allow non-static inner class as UDF) > In Table API, allow non-static inner class as UDF > - > > Key: FLINK-12283 > URL: https://issues.apache.org/jira/browse/FLINK-12283 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > > See details here > [https://lists.apache.org/thread.html/9ecec89ba1225dbd6b3ea2466a910ad9685a42a4672b449f6ee13565@%3Cuser.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 9.0
zentol commented on issue #10363: [FLINK-14104][build] Upgrade to flink-shaded 9.0 URL: https://github.com/apache/flink/pull/10363#issuecomment-561550679 This happens automatically. We don't list flink dependencies in our notice files (that's why flink-dist NOTICE remains unchanged) and the licensing for the binary is generated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15047) YarnDistributedCacheITCase is unstable
[ https://issues.apache.org/jira/browse/FLINK-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-15047: - Labels: test-stability (was: ) > YarnDistributedCacheITCase is unstable > -- > > Key: FLINK-15047 > URL: https://issues.apache.org/jira/browse/FLINK-15047 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Zili Chen >Priority: Major > Labels: test-stability > Fix For: 1.10.0 > > > See also https://api.travis-ci.com/v3/job/262854881/log.txt > cc [~ZhenqiuHuang] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14910) disableAutoGeneratedUIDs fails on keyBy
[ https://issues.apache.org/jira/browse/FLINK-14910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14910: --- Labels: pull-request-available (was: ) > disableAutoGeneratedUIDs fails on keyBy > --- > > Key: FLINK-14910 > URL: https://issues.apache.org/jira/browse/FLINK-14910 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: William Cheng >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0, 1.9.2 > > > There doesn't seem to be a way to add a UID to the Partition operator created > by KeyBy, causing `disableAutoGeneratedUIDs` to fail. > > Here's a simple test case that will reproduce the issue: > {noformat} > @Test > public void testFailedUID() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().disableAutoGeneratedUIDs(); > DataStream data = env.fromCollection(Arrays.asList("1", "2", > "3")).uid("source-uid"); > data.keyBy(i -> i) > .map(i -> i).uid("map-uid"); > env.execute(); > }{noformat} > {noformat} > testFailedUID(twitch.creatoranalytics.sessions.StreamingJobTest) Time > elapsed: 0.008 sec <<< ERROR! > java.lang.IllegalStateException: Auto generated UIDs have been disabled but > no UID or hash has been assigned to operator Partition > {noformat} > > This passes if the keyBy is removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode
zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode URL: https://github.com/apache/flink/pull/10406#discussion_r353638280 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -49,6 +50,8 @@ */ public class DefaultSchedulerFactory implements SchedulerNGFactory { + private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulerFactory.class); + @Override public SchedulerNG createInstance( final Logger log, Review comment: Should we use this logger? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10399: [FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType in blink planner
docete commented on a change in pull request #10399: [FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType in blink planner URL: https://github.com/apache/flink/pull/10399#discussion_r353638560 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ## @@ -386,10 +386,20 @@ object GenerateUtils { generateNonNullLiteral(literalType, fieldTerm, literalType) case TIMESTAMP_WITH_LOCAL_TIME_ZONE => -val millis = unixTimestampToLocalDateTime(literalValue.asInstanceOf[Long]) +val fieldTerm = newName("timestampWithLocalZone") +val millis = unixTimestampToLocalDateTime( + literalValue.asInstanceOf[TimestampString].getMillisSinceEpoch) .atZone(ctx.tableConfig.getLocalTimeZone) .toInstant.toEpochMilli -generateNonNullLiteral(literalType, millis + "L", literalValue) +val nanoOfMillis = SqlDateTimeUtils.getNanoOfMillisSinceEpoch( Review comment: the previous `Instant` only contains millisecond part This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10399: [FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType in blink planner
docete commented on a change in pull request #10399: [FLINK-14959][table-planner-blink] Support precision of LocalZonedTimestampType in blink planner URL: https://github.com/apache/flink/pull/10399#discussion_r353642653 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala ## @@ -66,7 +67,18 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato boxedTypeTermForType(returnType) } val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result") -val evalResult = s"$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")})" +val evalResult = + if (returnType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE + && (resultClass == classOf[Long] || resultClass == classOf[JLong])) { +// Convert Long to SqlTimestamp if the UDX's returnType is Review comment: will add a Long <-> Instant converter to support this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15046) Add guideline on how to report security issues
[ https://issues.apache.org/jira/browse/FLINK-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-15046: - Description: As discussed in the [ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951] , there should be a guideline on how to report security issues in Flink website. > Add guideline on how to report security issues > -- > > Key: FLINK-15046 > URL: https://issues.apache.org/jira/browse/FLINK-15046 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Dian Fu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in the > [ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951] > , there should be a guideline on how to report security issues in Flink > website. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15026) Support database DDLs in SQL CLI
[ https://issues.apache.org/jira/browse/FLINK-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-15026: --- Labels: pull-request-available (was: ) > Support database DDLs in SQL CLI > > > Key: FLINK-15026 > URL: https://issues.apache.org/jira/browse/FLINK-15026 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client >Affects Versions: 1.9.1 >Reporter: Danny Chen >Assignee: Terry Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > > Support DDL as following: > {code:sql} > CREATE DATABASE [ IF NOT EXISTS ] [ catalogName.] dataBaseName [ COMMENT > database_comment ] [WITH ( name=value [, name=value]*)] > DROP DATABASE [ IF EXISTS ] [ catalogName.] dataBaseName [ (RESTRICT|CASCADE)] > ALTER DATABASE [ catalogName.] dataBaseName SET ( name=value [, name=value]*) > USE [ catalogName.] dataBaseName > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15046) Add guideline on how to report security issues
[ https://issues.apache.org/jira/browse/FLINK-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-15046: Assignee: Dian Fu > Add guideline on how to report security issues > -- > > Key: FLINK-15046 > URL: https://issues.apache.org/jira/browse/FLINK-15046 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in the > [ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951] > , there should be a guideline on how to report security issues in Flink > website. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15046) Add guideline on how to report security issues
[ https://issues.apache.org/jira/browse/FLINK-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-15046: - Environment: (was: As discussed in the [ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Expose-or-setup-a-security-flink-apache-org-mailing-list-for-security-report-and-discussion-tt34950.html#a34951] , there should be a guideline on how to report security issues in Flink website.) > Add guideline on how to report security issues > -- > > Key: FLINK-15046 > URL: https://issues.apache.org/jira/browse/FLINK-15046 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Dian Fu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#issuecomment-561580556 This is nice work, thanks a lot :-) Could you add a test case that ensures the encoding is still the same? Maybe copy the old String read/write logic and compare it with the new one for some random Strings? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shuttie commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
shuttie commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#issuecomment-561581049 @AHeise thanks for all the ideas, I've updated the PR with all the proposals applied. As for `writeString` fallback code, I've found a better way of dealing with short strings, not requiring a separate code path. If you stare long enough in the jmh perfasm listing for short strings, you may notice that most of the time (compared with the original implementation) is spent within initial buffer size computation. In the original unbuffered code there is no reason to compute it, as there is no buffer. But in this PR we need to scan a string twice: to compute the buffer size, and then to write characters to the buffer. Main idea of this PR is to leverage CPU-level parallelism, helping it to process multiple characters at once. But the problem with short strings is that there is nothing to parallelize, so double-scanning overhead starts to kill the performance. The proposed fix is to over-allocate the buffer for short strings, skipping the exact buffer size computation. I've found a tipping point for this approach laying somewhere between 6-8 characters: * for strings < 6 chars it's faster to overallocate, * for strings of 6-8 chars it's the same as exact computation, * for strings > 8 chars it can be slower, but insignificantly. But in theory it may produce some GC pressure. The current round of benchmarks: ``` [info] Benchmark(length) (stringType) Mode Cnt Score Error Units [info] StringDeserializerBenchmark.deserializeDefault 1 ascii avgt 50 45.618 ± 0.339 ns/op [info] StringDeserializerBenchmark.deserializeDefault 2 ascii avgt 50 61.348 ± 0.579 ns/op [info] StringDeserializerBenchmark.deserializeDefault 4 ascii avgt 50 88.067 ± 1.058 ns/op [info] StringDeserializerBenchmark.deserializeDefault 8 ascii avgt 50 142.902 ± 1.121 ns/op [info] StringDeserializerBenchmark.deserializeDefault 16 ascii avgt 50 249.181 ± 1.920 ns/op [info] StringDeserializerBenchmark.deserializeDefault 32 ascii avgt 50 466.382 ± 1.502 ns/op [info] StringDeserializerBenchmark.deserializeImproved 1 ascii avgt 50 49.916 ± 0.132 ns/op [info] StringDeserializerBenchmark.deserializeImproved 2 ascii avgt 50 50.278 ± 0.064 ns/op [info] StringDeserializerBenchmark.deserializeImproved 4 ascii avgt 50 50.365 ± 0.129 ns/op [info] StringDeserializerBenchmark.deserializeImproved 8 ascii avgt 50 52.463 ± 0.301 ns/op [info] StringDeserializerBenchmark.deserializeImproved16 ascii avgt 50 55.711 ± 0.597 ns/op [info] StringDeserializerBenchmark.deserializeImproved32 ascii avgt 50 65.342 ± 0.555 ns/op [info] StringSerializerBenchmark.serializeDefault 1 ascii avgt 50 31.076 ± 0.192 ns/op [info] StringSerializerBenchmark.serializeDefault 2 ascii avgt 50 31.770 ± 1.811 ns/op [info] StringSerializerBenchmark.serializeDefault 4 ascii avgt 50 39.251 ± 0.189 ns/op [info] StringSerializerBenchmark.serializeDefault 8 ascii avgt 50 57.736 ± 0.253 ns/op [info] StringSerializerBenchmark.serializeDefault 16 ascii avgt 50 94.964 ± 0.514 ns/op [info] StringSerializerBenchmark.serializeDefault 32 ascii avgt 50 168.754 ± 1.416 ns/op [info] StringSerializerBenchmark.serializeImproved 1 ascii avgt 50 30.145 ± 0.156 ns/op [info] StringSerializerBenchmark.serializeImproved 2 ascii avgt 50 30.873 ± 0.274 ns/op [info] StringSerializerBenchmark.serializeImproved 4 ascii avgt 50 31.993 ± 0.276 ns/op [info] StringSerializerBenchmark.serializeImproved 8 ascii avgt 50 46.220 ± 0.211 ns/op [info] StringSerializerBenchmark.serializeImproved16 ascii avgt 50 50.856 ± 0.826 ns/op [info] StringSerializerBenchmark.serializeImproved32 ascii avgt 50 63.221 ± 1.130 ns/op ``` So for large strings the new implementation is much faster, and for short it's not regressing (and even slightly faster). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With
[GitHub] [flink] flinkbot edited a comment on issue #10417: [FLINK-14910][datastream] Checking for auto generated uids only for PhysicalStreamTransformations
flinkbot edited a comment on issue #10417: [FLINK-14910][datastream] Checking for auto generated uids only for PhysicalStreamTransformations URL: https://github.com/apache/flink/pull/10417#issuecomment-561567788 ## CI report: * 64b24f882e32caebd6489f890ca477305acf31cc : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139296355) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor…
flinkbot edited a comment on issue #10418: [FLINK-15050][table-planner-blink] DataFormatConverters should suppor… URL: https://github.com/apache/flink/pull/10418#issuecomment-561567846 ## CI report: * 6951e77dff9c9c4fb1945d79ef4253e5ea2c0f0f : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139296371) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13662) FlinkKinesisProducerTest.testBackpressure failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987742#comment-16987742 ] Till Rohrmann commented on FLINK-13662: --- Do you wanna give it a try and see whether it fixes the problem [~fmthoma]? > FlinkKinesisProducerTest.testBackpressure failed on Travis > -- > > Key: FLINK-13662 > URL: https://issues.apache.org/jira/browse/FLINK-13662 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Tests >Affects Versions: 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{FlinkKinesisProducerTest.testBackpressure}} failed on Travis with > {code} > 14:45:50.489 [ERROR] Failures: > 14:45:50.489 [ERROR] FlinkKinesisProducerTest.testBackpressure:298 Flush > triggered before reaching queue limit > {code} > https://api.travis-ci.org/v3/job/569262823/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol commented on a change in pull request #10384: [FLINK-13373][docs] Remove Tutorials and Examples sections from Getting Started
zentol commented on a change in pull request #10384: [FLINK-13373][docs] Remove Tutorials and Examples sections from Getting Started URL: https://github.com/apache/flink/pull/10384#discussion_r353675794 ## File path: docs/ops/deployment/local.md ## @@ -0,0 +1,178 @@ +--- +title: "Local Cluster" +nav-title: 'Local Cluster' +nav-parent_id: deployment +nav-pos: 1 +--- + + +Get a local Flink cluster up and running in a few simple steps. + +* This will be replaced by the TOC +{:toc} + +## Setup: Download and Start Flink + +Flink runs on __Linux, Mac OS X, and Windows__. +To be able to run Flink, the only requirement is to have a working __Java 8.x__ installation. + +You can check the correct installation of Java by issuing the following command: + +{% highlight bash %} +java -version +{% endhighlight %} + +If you have Java 8, the output will look something like this: + +{% highlight bash %} +java version "1.8.0_111" +Java(TM) SE Runtime Environment (build 1.8.0_111-b14) +Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode) +{% endhighlight %} + +{% if site.is_stable %} + + +1. Download a binary from the [downloads page](https://flink.apache.org/downloads.html). You can pick + any Scala variant you like. For certain features you may also have to download one of the pre-bundled Hadoop jars + and place them into the `/lib` directory. +2. Go to the download directory. +3. Unpack the downloaded archive. + +{% highlight bash %} +$ cd ~/Downloads# Go to download directory +$ tar xzf flink-*.tgz # Unpack the downloaded archive +$ cd flink-{{site.version}} +{% endhighlight %} + + + +For MacOS X users, Flink can be installed through [Homebrew](https://brew.sh/). + +{% highlight bash %} +$ brew install apache-flink +... +$ flink --version +Version: 1.2.0, Commit ID: 1c659cf +{% endhighlight %} + + + + +{% else %} +### Download and Compile +Clone the source code from one of our [repositories](https://flink.apache.org/community.html#source-code), e.g.: + +{% highlight bash %} +$ git clone https://github.com/apache/flink.git +$ cd flink +$ mvn clean package -DskipTests # this will take up to 10 minutes Review comment: yeah it's more like 25 I think. Depending on the environment the build of the WebUI may also slow things down significantly; or fail outright. May be useful to document `-Pskip-webui-build` for these cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-15047) YarnDistributedCacheITCase is unstable
[ https://issues.apache.org/jira/browse/FLINK-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-15047: - Assignee: Andrey Zagrebin > YarnDistributedCacheITCase is unstable > -- > > Key: FLINK-15047 > URL: https://issues.apache.org/jira/browse/FLINK-15047 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Zili Chen >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > See also https://api.travis-ci.com/v3/job/262854881/log.txt > cc [~ZhenqiuHuang] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode
flinkbot edited a comment on issue #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode URL: https://github.com/apache/flink/pull/10406#issuecomment-561380440 ## CI report: * 0effd7f6b294b7f2da91c7681aa6e8ae51390efc : UNKNOWN * 4fbefcc29212ef9c62258aa73ebd7b9f2a97 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139227336) * f1a71d26b6d4f370a58f48e246f59077ec83221c : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139290722) * 3be557618cc3c803ce23659fd38ae6644b9a3b7d : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139302249) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on issue #10359: [FLINK-14813][metrics] Provide `isBackPressured` Task metric
jinglining commented on issue #10359: [FLINK-14813][metrics] Provide `isBackPressured` Task metric URL: https://github.com/apache/flink/pull/10359#issuecomment-561604330 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #10325: [FLINK-14512][table] Introduce listPartitionsByFilter to Catalog
JingsongLi commented on issue #10325: [FLINK-14512][table] Introduce listPartitionsByFilter to Catalog URL: https://github.com/apache/flink/pull/10325#issuecomment-561618185 > Thanks @JingsongLi for the update. It seems the failed test is related? Yes, we should modify python too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15052) sql client doesn't clear previous job graph
[ https://issues.apache.org/jira/browse/FLINK-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-15052: --- Fix Version/s: 1.10.0 > sql client doesn't clear previous job graph > > > Key: FLINK-15052 > URL: https://issues.apache.org/jira/browse/FLINK-15052 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Reporter: Kurt Young >Assignee: Danny Chen >Priority: Blocker > Fix For: 1.10.0 > > > when executing multiple commands from sql client, the later job graph will > include all job graphs which already executed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to execution environments
TisonKun commented on issue #10408: [FLINK-14992][client] Add job listener to execution environments URL: https://github.com/apache/flink/pull/10408#issuecomment-561525250 > Thanks @TisonKun I am afraid I could not test it as in Zeppelin I use `ScalaShellRemoteEnvironment` which is not affected by this PR. Got it. @kl0u what is the progress of using new `Execution` in `ScalaShellRemoteEnvironment`? I don't see a dedicated JIRA ticket so far. For satisfying Zeppelin's requirement we might need write in-place twisting code if we cannot get `ScalaShellRemoteEnvironment` refactor in time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#discussion_r353595553 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.compression.BlockCompressionFactory; +import org.apache.flink.runtime.io.compression.BlockCompressor; + +import java.nio.ByteBuffer; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Compressor for {@link Buffer}. + */ +public class BufferCompressor { + + /** The backing block compressor for data compression. */ + private final BlockCompressor blockCompressor; + + /** The intermediate heap buffer for the compressed data. */ + private final byte[] heapBuffer; + + public BufferCompressor(int bufferSize, String factoryName) { + checkArgument(bufferSize > 0); + // the size of this intermediate heap buffer will be gotten from the + // plugin configuration in the future, and currently, double size of + // the input buffer is enough for lz4-java compression library. + this.heapBuffer = new byte[2 * bufferSize]; + this.blockCompressor = BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor(); + } + + /** +* Compresses the given {@link Buffer} using {@link BlockCompressor}. The compressed data will be stored in the +* internal heap buffer of this {@link BufferCompressor} and returned to the caller. The caller must guarantee +* that the returned {@link Buffer} is freed when calling the method next time. +* +* Notes that the compression will always start from offset 0 to the size of the input {@link Buffer}. +*/ + public Buffer compressToInternalBuffer(Buffer buffer) { + int compressedLen; + if ((compressedLen = compress(buffer)) == 0) { + return buffer; + } + + try { + if (compressedLen >= buffer.getSize()) { + return buffer; + } + + // warp the internal heap buffer as Buffer + MemorySegment memorySegment = MemorySegmentFactory.wrap(heapBuffer); + NetworkBuffer compressedBuffer = new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE); + compressedBuffer.setSize(compressedLen); + compressedBuffer.setCompressed(true); + + return compressedBuffer; + } catch (Throwable throwable) { + return buffer; + } + } + + /** +* The difference between this method and {@link #compressToInternalBuffer(Buffer)} is that this method will +* copy the compressed data back to the input {@link Buffer} starting from offset 0. +* +* The caller must guarantee that the input {@link Buffer} is writable and there's enough space left. +*/ + public Buffer compressInPlace(Buffer buffer) { + int compressedLen; + if ((compressedLen = compress(buffer)) == 0) { + return buffer; + } + + try { + if (compressedLen >= buffer.getSize()) { + return buffer; + } + + // copy the compressed data back + MemorySegment segment = buffer.getMemorySegment(); + segment.put(buffer.getMemorySegmentOffset(), heapBuffer, 0, compressedLen); + Buffer compressedBuffer = buffer.readOnlySlice(0, compressedLen); +
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353019073 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -79,6 +80,101 @@ def __repr__(self): return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders) +class ArrayCoderImpl(StreamCoderImpl): + +def __init__(self, elem_coder): +self._elem_coder = elem_coder + +def encode_to_stream(self, value, out_stream, nested): +out_stream.write_bigendian_int32(len(value)) +for elem in value: +if elem is None: +out_stream.write_byte(False) +else: +out_stream.write_byte(True) +self._elem_coder.encode_to_stream(elem, out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +elements = [self._elem_coder.decode_from_stream(in_stream, nested) +if not not in_stream.read_byte() else None for _ in range(size)] +return elements + +def __repr__(self): +return 'ArrayCoderImpl[%s]' % str(self._elem_coder) + + +class MapCoderImpl(StreamCoderImpl): + +def __init__(self, key_coder, value_coder): +self._key_coder = key_coder +self._value_coder = value_coder + +def encode_to_stream(self, map_value, out_stream, nested): +out_stream.write_bigendian_int32(len(map_value)) +for key in map_value: +self._key_coder.encode_to_stream(key, out_stream, nested) +value = map_value[key] +if value is None: +out_stream.write_byte(True) +else: +out_stream.write_byte(False) +self._value_coder.encode_to_stream(map_value[key], out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +map_value = {} +for _ in range(size): +key = self._key_coder.decode_from_stream(in_stream, nested) +is_null = not not in_stream.read_byte() +if is_null: Review comment: use in_stream.read_byte() directly or bool(in_stream.read_byte())? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353016856 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -79,6 +80,101 @@ def __repr__(self): return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders) +class ArrayCoderImpl(StreamCoderImpl): + +def __init__(self, elem_coder): +self._elem_coder = elem_coder + +def encode_to_stream(self, value, out_stream, nested): +out_stream.write_bigendian_int32(len(value)) +for elem in value: +if elem is None: +out_stream.write_byte(False) +else: +out_stream.write_byte(True) +self._elem_coder.encode_to_stream(elem, out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +elements = [self._elem_coder.decode_from_stream(in_stream, nested) +if not not in_stream.read_byte() else None for _ in range(size)] Review comment: remove "not not"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353047775 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -140,6 +236,23 @@ def decode_from_stream(self, in_stream, nested): return in_stream.read_bigendian_double() +class DecimalCoderImpl(StreamCoderImpl): + +def __init__(self, precision, scale): +decimal.getcontext().prec = precision Review comment: Maybe we should hold a individual context object here and replace current context at the beginning of encode/decode and restore users' context at the end of encode/decode? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353592042 ## File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/BinaryMapSerializer.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.dataformat.BaseMap; +import org.apache.flink.table.dataformat.BinaryArray; +import org.apache.flink.table.dataformat.BinaryArrayWriter; +import org.apache.flink.table.dataformat.BinaryMap; +import org.apache.flink.table.dataformat.BinaryWriter; +import org.apache.flink.table.dataformat.TypeGetterSetters; +import org.apache.flink.table.runtime.typeutils.BaseMapSerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * A {@link TypeSerializer} for {@link BinaryMap}. It should be noted that the header will not be encoded. + * Currently Python doesn't support BinaryMap natively, so we can't use BaseArraySerializer in blink directly. + */ +@Internal +public class BinaryMapSerializer extends BaseMapSerializer { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353590279 ## File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/BinaryArraySerializer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.dataformat.BaseArray; +import org.apache.flink.table.dataformat.BinaryArray; +import org.apache.flink.table.dataformat.BinaryArrayWriter; +import org.apache.flink.table.dataformat.BinaryWriter; +import org.apache.flink.table.dataformat.TypeGetterSetters; +import org.apache.flink.table.runtime.typeutils.BaseArraySerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * A {@link TypeSerializer} for {@link BinaryArray}. It should be noted that the header will not be encoded. + * Currently Python doesn't support BinaryArray natively, so we can't use BaseArraySerializer in blink directly. + */ +@Internal +public class BinaryArraySerializer extends BaseArraySerializer { Review comment: The name "BinaryArraySerializer" is not accurate, maybe "PythonBaseArraySerializer" is better? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353093687 ## File path: flink-python/pyflink/fn_execution/tests/coders_test_common.py ## @@ -73,13 +74,34 @@ def test_binary_coder(self): def test_char_coder(self): coder = CharCoder() -self.check_coder(coder, 'flink') +self.check_coder(coder, 'flink', '') def test_date_coder(self): import datetime coder = DateCoder() self.check_coder(coder, datetime.date(2019, 9, 10)) +def test_array_coder(self): +element_coder = BigIntCoder() +coder = ArrayCoder(element_coder) +self.check_coder(coder, [1, 2, 3, None]) + +def test_map_coder(self): +key_coder = CharCoder() +value_coder = BigIntCoder() +coder = MapCoder(key_coder, value_coder) +self.check_coder(coder, {'flink': 1, 'pyflink': 2, 'coder': None}) + +def test_multiset_coder(self): +element_coder = CharCoder() +coder = MultisetCoder(element_coder) +self.check_coder(coder, ['flink', 'flink', 'pyflink']) + +def test_decimal_coder(self): +from decimal import Decimal +coder = DecimalCoder() Review comment: How about test with different precision? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353088625 ## File path: flink-python/pyflink/fn_execution/coders.py ## @@ -68,6 +70,98 @@ def __hash__(self): return hash(self._field_coders) +class CollectionCoder(FastCoder): +""" +Base coder for collection. +""" +def __init__(self, elem_coder): +self._elem_coder = elem_coder + +def _create_impl(self): +return self._impl_coder()(self._elem_coder.get_impl()) + +def _impl_coder(self): +raise NotImplementedError + +def is_deterministic(self): +return self._elem_coder.is_deterministic() + +def to_type_hint(self): +return [] + +def __eq__(self, other): +return (self.__class__ == other.__class__ +and self._elem_coder == other._elem_coder) + +def __repr__(self): +return '%s[%s]' % (self.__class__.__name__, str(self._elem_coder)) + +def __ne__(self, other): +return not self == other + +def __hash__(self): +return hash(self._elem_coder) + + +class ArrayCoder(CollectionCoder): +""" +Coder for Array. +""" + +def __init__(self, elem_coder): +self._elem_coder = elem_coder +super(ArrayCoder, self).__init__(elem_coder) + +def _impl_coder(self): Review comment: How about override _create_impl directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353585748 ## File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/BinaryArraySerializer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.typeutils.serializers.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.dataformat.BaseArray; +import org.apache.flink.table.dataformat.BinaryArray; +import org.apache.flink.table.dataformat.BinaryArrayWriter; +import org.apache.flink.table.dataformat.BinaryWriter; +import org.apache.flink.table.dataformat.TypeGetterSetters; +import org.apache.flink.table.runtime.typeutils.BaseArraySerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * A {@link TypeSerializer} for {@link BinaryArray}. It should be noted that the header will not be encoded. + * Currently Python doesn't support BinaryArray natively, so we can't use BaseArraySerializer in blink directly. + */ +@Internal +public class BinaryArraySerializer extends BaseArraySerializer { Review comment: If we extends BaseArraySerializer, it seems the type parameter "K" is unnecessary? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353024028 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -79,6 +80,101 @@ def __repr__(self): return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders) +class ArrayCoderImpl(StreamCoderImpl): + +def __init__(self, elem_coder): +self._elem_coder = elem_coder + +def encode_to_stream(self, value, out_stream, nested): +out_stream.write_bigendian_int32(len(value)) +for elem in value: +if elem is None: +out_stream.write_byte(False) +else: +out_stream.write_byte(True) +self._elem_coder.encode_to_stream(elem, out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +elements = [self._elem_coder.decode_from_stream(in_stream, nested) +if not not in_stream.read_byte() else None for _ in range(size)] +return elements + +def __repr__(self): +return 'ArrayCoderImpl[%s]' % str(self._elem_coder) + + +class MapCoderImpl(StreamCoderImpl): + +def __init__(self, key_coder, value_coder): +self._key_coder = key_coder +self._value_coder = value_coder + +def encode_to_stream(self, map_value, out_stream, nested): +out_stream.write_bigendian_int32(len(map_value)) +for key in map_value: +self._key_coder.encode_to_stream(key, out_stream, nested) +value = map_value[key] +if value is None: +out_stream.write_byte(True) +else: +out_stream.write_byte(False) +self._value_coder.encode_to_stream(map_value[key], out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +map_value = {} +for _ in range(size): +key = self._key_coder.decode_from_stream(in_stream, nested) +is_null = not not in_stream.read_byte() +if is_null: +map_value[key] = None +else: +value = self._value_coder.decode_from_stream(in_stream, nested) +map_value[key] = value +return map_value + +def __repr__(self): +return 'MapCoderImpl[%s]' % ' : '.join([str(self._key_coder), str(self._value_coder)]) Review comment: use repr() instead of str()? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions
WeiZhong94 commented on a change in pull request #10086: [FLINK-14584][python] Support complex data types in Python user-defined functions URL: https://github.com/apache/flink/pull/10086#discussion_r353025966 ## File path: flink-python/pyflink/fn_execution/coder_impl.py ## @@ -79,6 +80,101 @@ def __repr__(self): return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders) +class ArrayCoderImpl(StreamCoderImpl): + +def __init__(self, elem_coder): +self._elem_coder = elem_coder + +def encode_to_stream(self, value, out_stream, nested): +out_stream.write_bigendian_int32(len(value)) +for elem in value: +if elem is None: +out_stream.write_byte(False) +else: +out_stream.write_byte(True) +self._elem_coder.encode_to_stream(elem, out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +elements = [self._elem_coder.decode_from_stream(in_stream, nested) +if not not in_stream.read_byte() else None for _ in range(size)] +return elements + +def __repr__(self): +return 'ArrayCoderImpl[%s]' % str(self._elem_coder) + + +class MapCoderImpl(StreamCoderImpl): + +def __init__(self, key_coder, value_coder): +self._key_coder = key_coder +self._value_coder = value_coder + +def encode_to_stream(self, map_value, out_stream, nested): +out_stream.write_bigendian_int32(len(map_value)) +for key in map_value: +self._key_coder.encode_to_stream(key, out_stream, nested) +value = map_value[key] +if value is None: +out_stream.write_byte(True) +else: +out_stream.write_byte(False) +self._value_coder.encode_to_stream(map_value[key], out_stream, nested) + +def decode_from_stream(self, in_stream, nested): +size = in_stream.read_bigendian_int32() +map_value = {} +for _ in range(size): +key = self._key_coder.decode_from_stream(in_stream, nested) +is_null = not not in_stream.read_byte() +if is_null: +map_value[key] = None +else: +value = self._value_coder.decode_from_stream(in_stream, nested) +map_value[key] = value +return map_value + +def __repr__(self): +return 'MapCoderImpl[%s]' % ' : '.join([str(self._key_coder), str(self._value_coder)]) + + +class MultisetCoderImpl(StreamCoderImpl): + +def __init__(self, element_coder): +self._element_coder = element_coder + +def encode_to_stream(self, value, out_stream, nested): +dict_value = self.multiset_to_dict(value) +out_stream.write_bigendian_int32(len(dict_value)) Review comment: This part is duplicated with MapCoderImpl, can we reuse it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14567) Aggregate query with more than two group fields can't be write into HBase sink
[ https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987644#comment-16987644 ] Jark Wu commented on FLINK-14567: - Hi [~twalthr] [~ykt836], do you mean if the key information of sink is not matched with query keys, then we can add a keyBy shuffle between them? Otherwise, they can be chained. We already added primary key constraint to {{TableSchema}}, so we only need to add a keyBy shuffle in framework if they are not matched. > Aggregate query with more than two group fields can't be write into HBase sink > -- > > Key: FLINK-14567 > URL: https://issues.apache.org/jira/browse/FLINK-14567 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase, Table SQL / Legacy Planner, Table > SQL / Planner >Reporter: Jark Wu >Priority: Critical > > If we have a hbase table sink with rowkey of varchar (also primary key) and a > column of bigint, we want to write the result of the following query into the > sink using upsert mode. However, it will fail when primary key check with the > exception "UpsertStreamTableSink requires that Table has a full primary keys > if it is updated." > {code:sql} > select concat(f0, '-', f1) as key, sum(f2) > from T1 > group by f0, f1 > {code} > This happens in both blink planner and old planner. That is because if the > query works in update mode, then there must be a primary key exist to be > extracted and set to {{UpsertStreamTableSink#setKeyFields}}. > That's why we want to derive primary key for concat in FLINK-14539, however, > we found that the primary key is not preserved after concating. For example, > if we have a primary key (f0, f1, f2) which are all varchar type, say we have > two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of > concat(f0, f1, f2) are the same, which means the concat result is not primary > key anymore. > So here comes the problem, how can we proper support HBase sink or such use > case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10408: [FLINK-14992][client] Add job listener to execution environments
flinkbot edited a comment on issue #10408: [FLINK-14992][client] Add job listener to execution environments URL: https://github.com/apache/flink/pull/10408#issuecomment-561446399 ## CI report: * 26dba18b2fd51c8df67cc33eace890b5ca34e182 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139256957) * 304ef1d6f41d2a2b29997312f3d3ced631fc2ec5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139259056) * e39ef8250f07fc23867ad60b24e3efb3af153cf1 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139278737) * 01470a38e8d7194e5309f2ca4f6f070a507ca4ca : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139281845) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10415: [FLINK-15049][table-planner-blink] Compile error when hash join with …
flinkbot edited a comment on issue #10415: [FLINK-15049][table-planner-blink] Compile error when hash join with … URL: https://github.com/apache/flink/pull/10415#issuecomment-561528911 ## CI report: * e838831b098e3043b6576aec5ba38ee6bdc29422 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139281869) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10416: [FLINK-14484] Enable to control memory usage of RocksDB via Cache and WriteBufferManager
flinkbot commented on issue #10416: [FLINK-14484] Enable to control memory usage of RocksDB via Cache and WriteBufferManager URL: https://github.com/apache/flink/pull/10416#issuecomment-561539660 ## CI report: * 32d4ffce9d515f19045ff5a13a9602f0b52d3196 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode
zhuzhurk commented on a change in pull request #10406: [FLINK-15045][runtime] Only log RestartStrategy in legacy scheduling mode URL: https://github.com/apache/flink/pull/10406#discussion_r353616472 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ## @@ -77,6 +80,7 @@ public SchedulerNG createInstance( jobMasterConfiguration, jobGraph.isCheckpointingEnabled()) .create(); + LOG.info("Using back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID()); Review comment: > I introduced a log message here because it is closest to the object creation. Moreover, I prefer to [only have field assignments](https://www.yegor256.com/2015/05/07/ctors-must-be-code-free.html) in the constructor (note however, that this already does not hold in `DefaultScheduler`). Imo, another reasonable place to log the _scheduling configuration_ is in `startSchedulingInternal()`. fine. In my understanding logging is usually not considered to be real logics (though it does something), otherwise in some cases we might find it hard to print instance internal states for trouble shooting. Anyway, I think it's also fine to keep the logging here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14567) Aggregate query with more than two group fields can't be write into HBase sink
[ https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987675#comment-16987675 ] Kurt Young commented on FLINK-14567: Basically yes. But this would involve the discussion about how to deal with primary key in source and sink, which would be the following work of FLIP-87. > Aggregate query with more than two group fields can't be write into HBase sink > -- > > Key: FLINK-14567 > URL: https://issues.apache.org/jira/browse/FLINK-14567 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase, Table SQL / Legacy Planner, Table > SQL / Planner >Reporter: Jark Wu >Priority: Critical > > If we have a hbase table sink with rowkey of varchar (also primary key) and a > column of bigint, we want to write the result of the following query into the > sink using upsert mode. However, it will fail when primary key check with the > exception "UpsertStreamTableSink requires that Table has a full primary keys > if it is updated." > {code:sql} > select concat(f0, '-', f1) as key, sum(f2) > from T1 > group by f0, f1 > {code} > This happens in both blink planner and old planner. That is because if the > query works in update mode, then there must be a primary key exist to be > extracted and set to {{UpsertStreamTableSink#setKeyFields}}. > That's why we want to derive primary key for concat in FLINK-14539, however, > we found that the primary key is not preserved after concating. For example, > if we have a primary key (f0, f1, f2) which are all varchar type, say we have > two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of > concat(f0, f1, f2) are the same, which means the concat result is not primary > key anymore. > So here comes the problem, how can we proper support HBase sink or such use > case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191 ## CI report: * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366796) * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133940607) * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133998545) * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010321) * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145261) * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017410) * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138730857) * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138770610) * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116897) * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139256990) * ce01f742aba3a683b98a0ea7da47e678d30c12be : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353634011 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -136,6 +146,35 @@ return builder.build(); } + /** +* The original table schema may contain generated columns which shouldn't be produced/consumed +* by TableSource/TableSink. And the original TIMESTAMP/DATE/TIME types uses LocalDateTime/LocalDate/LocalTime +* as the conversion classes, however, JDBC connector uses Timestamp/Date/Time classes. So that +* we bridge them to the expected conversion classes. +*/ + private TableSchema getPhysicalTableSchema(DescriptorProperties descriptorProperties) { Review comment: Should there be a util method in `DescriptorProperties` or somewhere to re-use? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353634713 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java ## @@ -119,7 +126,17 @@ private HBaseTableSchema validateTableSchema(TableSchema schema) { String[] qualifierNames = familyType.getFieldNames(); TypeInformation[] qualifierTypes = familyType.getFieldTypes(); for (int j = 0; j < familyType.getArity(); j++) { - hbaseSchema.addColumn(name, qualifierNames[j], qualifierTypes[j].getTypeClass()); + // HBase connector doesn't support LocalDateTime + // use Timestamp as conversion class for now. + Class clazz = qualifierTypes[j].getTypeClass(); + if (LocalDateTime.class.equals(clazz)) { + clazz = Timestamp.class; Review comment: Not work for nested fields? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353634312 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -136,6 +146,35 @@ return builder.build(); } + /** +* The original table schema may contain generated columns which shouldn't be produced/consumed +* by TableSource/TableSink. And the original TIMESTAMP/DATE/TIME types uses LocalDateTime/LocalDate/LocalTime +* as the conversion classes, however, JDBC connector uses Timestamp/Date/Time classes. So that +* we bridge them to the expected conversion classes. +*/ + private TableSchema getPhysicalTableSchema(DescriptorProperties descriptorProperties) { + TableSchema schema = descriptorProperties.getTableSchema(SCHEMA); + TableSchema.Builder physicalSchemaBuilder = TableSchema.builder(); + schema.getTableColumns() + .forEach(c -> { + if (!c.isGenerated()) { + LogicalTypeRoot root = c.getType().getLogicalType().getTypeRoot(); + final DataType type; + if (root == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + type = c.getType().bridgedTo(Timestamp.class); Review comment: But not work for nested fields? right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353628831 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java ## @@ -181,6 +182,16 @@ private static void validateLogicalTypeEqualsPhysical( TableSource tableSource) { ResolvedField resolvedField = resolveField(fieldName, tableSource); if (!resolvedField.getType().equals(logicalType)) { + + if (resolvedField.getType().getLogicalType() instanceof LegacyTypeInformationType && + logicalType.getLogicalType().getTypeRoot() == LogicalTypeRoot.DECIMAL && + logicalType.getLogicalType().getTypeRoot() == resolvedField.getType().getLogicalType().getTypeRoot() && + logicalType.getConversionClass() == resolvedField.getType().getConversionClass()) { Review comment: Is that logical equal is enough? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353632499 ## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java ## @@ -340,56 +340,68 @@ private DeserializationRuntimeConverter createFallbackConverter(Class valueTy return Optional.of(createTimeConverter()); } else if (simpleTypeInfo == Types.SQL_TIMESTAMP) { return Optional.of(createTimestampConverter()); + } else if (simpleTypeInfo == Types.LOCAL_DATE) { + return Optional.of(this::convertToLocalDate); + } else if (simpleTypeInfo == Types.LOCAL_TIME) { + return Optional.of(this::convertToLocalTime); + } else if (simpleTypeInfo == Types.LOCAL_DATE_TIME) { + return Optional.of(this::convertToLocalDateTime); } else { return Optional.empty(); } } + private LocalDate convertToLocalDate(ObjectMapper mapper, JsonNode jsonNode) { + return ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate()); + } + private DeserializationRuntimeConverter createDateConverter() { Review comment: Can you modify `createDateConverter` to `convertToDate` style too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353638111 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java ## @@ -216,6 +216,18 @@ else if (canConvertToTimestampTypeInfoLenient(dataType)) { return Types.SQL_TIMESTAMP; } + // relax the precision constraint as LocalDateTime can store the highest precision + else if (hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) && Review comment: Maybe you can take a look to above, there are some comments and limitation for `java.sql.Timestamp`. And others... If you want to modify, maybe you should modify all or just respect their rules. (Like in `canConvertToTimestampTypeInfoLenient`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353625420 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ## @@ -582,19 +607,26 @@ public short getShort(String key) { final TableSchema.Builder schemaBuilder = TableSchema.builder(); for (int i = 0; i < fieldCount; i++) { final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME; - final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE; + final String legacyTypeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE; + final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_DATATYPE; final String exprKey = key + '.' + i + '.' + TABLE_SCHEMA_EXPR; final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey)); - final TypeInformation type = optionalGet(typeKey) - .map(TypeStringUtils::readTypeInfo) - .orElseThrow(exceptionSupplier(typeKey)); + final DataType type; + if (containsKey(typeKey)) { Review comment: NIT: use `? :` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties
JingsongLi commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties URL: https://github.com/apache/flink/pull/10403#discussion_r353625137 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ## @@ -582,19 +607,26 @@ public short getShort(String key) { final TableSchema.Builder schemaBuilder = TableSchema.builder(); for (int i = 0; i < fieldCount; i++) { final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME; - final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE; + final String legacyTypeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE; + final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_DATATYPE; final String exprKey = key + '.' + i + '.' + TABLE_SCHEMA_EXPR; final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey)); - final TypeInformation type = optionalGet(typeKey) - .map(TypeStringUtils::readTypeInfo) - .orElseThrow(exceptionSupplier(typeKey)); + final DataType type; + if (containsKey(typeKey)) { + type = getDataType(typeKey); + } else if (containsKey(legacyTypeKey)) { + type = LegacyTypeInfoDataTypeConverter.toDataType(getType(legacyTypeKey)); Review comment: Use `TypeConversions.***`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes. URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191 ## CI report: * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366796) * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133940607) * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133998545) * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010321) * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145261) * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017410) * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138730857) * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138770610) * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116897) * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139256990) * ce01f742aba3a683b98a0ea7da47e678d30c12be : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139290769) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9965: [FLINK-10935][kubernetes]Implement KubeClient with Faric8 Kubernetes clients
flinkbot edited a comment on issue #9965: [FLINK-10935][kubernetes]Implement KubeClient with Faric8 Kubernetes clients URL: https://github.com/apache/flink/pull/9965#issuecomment-544813931 ## CI report: * 6f90b457e56a0a8cb45d63c1b05b47d2e38030a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132938440) * 86aa5ce8f77faf233c51a7231b3f71e518fd6c92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132962300) * 7851d845a43f799627b2c788ace8eb7e6caccb03 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133133283) * d49bb0a622e1667baffd29f19fdcc60d0022fe82 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366914) * 0e88e0e5be77f450c82cbc460ea4f02a1effc920 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133998649) * 9afcc7ba840186c68f36b30d6b28b8c1cbf09b61 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145224) * 06e6b2bee4c1788b150f2b83c43eb4723709864b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017584) * 816a11afe713e736cdfd2eb566762ee2addf7071 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138708643) * e80691082c9fc8ac704b0bccaa5180b7de0718d5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138770607) * e85e76c018b7606381c5869e0c1054e02c4a2321 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138957695) * d1c41ae28960b3eea21fc662e771fe44c1c53b12 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139101718) * 1e54eb3124deaca01afd2a3d28edbdd8086db301 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139290748) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for equality to NaN
flinkbot edited a comment on issue #10333: [FLINK-14970]Doomed test for equality to NaN URL: https://github.com/apache/flink/pull/10333#issuecomment-559012637 ## CI report: * c5bd9898fbbeb3d668051475409b52bfb1e2 : UNKNOWN * 44f202263cbb26473b06ea19f293bf0b6afce8e6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138375519) * e0ea169fab74ca07617a056e1c60861938ad6f0b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139122432) * 38bef99d2f337b556c87141e0240ddd7075fd980 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139296161) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString
StephanEwen commented on issue #10358: [FLINK-14346] [serialization] faster implementation of StringValue writeString and readString URL: https://github.com/apache/flink/pull/10358#issuecomment-561582129 Some thoughts for follow-up: - Do you know where exactly the performance difference comes from? Is it mainly the many individual `read()` single byte operations, that get more efficient if you bulk get into a byte array? - We can actually break the serialization format, as long as we change the StringSerializer config snapshot and restore methods. We can return "needs conversion" as the compatibility and return a serializer with the old encoding as the restore serializer. - Thinking twice, the above would only work if all parts support serializer evoluation, and I think keys in RocksDB cannot be evolved right now (not yet implemented). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #10238: [FLINK-8949] Add dedicated watermarks metric retrieval endpoint
zentol commented on a change in pull request #10238: [FLINK-8949] Add dedicated watermarks metric retrieval endpoint URL: https://github.com/apache/flink/pull/10238#discussion_r353664553 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandler.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.job.AbstractJobVertexHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexWatermarksHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.Metric; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + + +/** + * Handler that returns the watermarks given a {@link JobID} and {@link JobVertexID}. + */ +public class JobVertexWatermarksHandler extends AbstractJobVertexHandler { + + private final MetricFetcher metricFetcher; + + public JobVertexWatermarksHandler( + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MetricFetcher metricFetcher, + ExecutionGraphCache executionGraphCache, + Executor executor) { + super(leaderRetriever, + timeout, + responseHeaders, + JobVertexWatermarksHeaders.INSTANCE, + executionGraphCache, + executor); + this.metricFetcher = metricFetcher; + } + + @Override + protected MetricCollectionResponseBody handleRequest( + HandlerRequest request, + AccessExecutionJobVertex jobVertex) throws RestHandlerException { + + String jobID = request.getPathParameter(JobIDPathParameter.class).toString(); + String taskID = jobVertex.getJobVertexId().toString(); + + metricFetcher.update(); + MetricStore.TaskMetricStore taskMetricStore = metricFetcher.getMetricStore().getTaskMetricStore(jobID, taskID); + if (taskMetricStore == null) { + return new MetricCollectionResponseBody(Collections.emptyList()); + } + + AccessExecutionVertex[] taskVertices = jobVertex.getTaskVertices(); + List metrics = new ArrayList<>(taskVertices.length); + + for (AccessExecutionVertex taskVertex : taskVertices) { + String id = taskVertex.getParallelSubtaskIndex() + "." + MetricNames.IO_CURRENT_INPUT_WATERMARK; + String watermarkValue = taskMetricStore.getMetric(id); + if (watermarkValue != null) { + metrics.add(new Metric(id, watermarkValue)); + } + } + + return
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353667305 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -397,7 +406,8 @@ private static void checkAndCreateDirectory(File directory) throws IOException { nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager); + ttlCompactFiltersManager, + writeBatchSize); Review comment: Can we use `this.writeBatchSize` directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353670935 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java ## @@ -119,6 +127,8 @@ public RocksDBFullRestoreOperation( metricGroup, restoreStateHandles, ttlCompactFiltersManager); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative."); Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353669969 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -49,22 +53,35 @@ private final int capacity; - public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB) { - this(rocksDB, null, 500); + @Nonnegative + private final long batchSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long writeBatchSize) { + this(rocksDB, null, 500, writeBatchSize); } public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options) { - this(rocksDB, options, 500); + this(rocksDB, options, 500, DEFAULT_BATCH_SIZE); + } + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, long batchSize) { + this(rocksDB, options, 500, batchSize); } - public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, int capacity) { + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, int capacity, long batchSize) { Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, "capacity should be between " + MIN_CAPACITY + " and " + MAX_CAPACITY); + Preconditions.checkArgument(batchSize >= 0, "Max batch size have to be no negative."); Review comment: We can use `@Nonnegative` for all function parameters instead of `checkArgument` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353662612 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -131,7 +135,8 @@ public RocksDBKeyedStateBackendBuilder( MetricGroup metricGroup, @Nonnull Collection stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, - CloseableRegistry cancelStreamRegistry) { + CloseableRegistry cancelStreamRegistry, + @Nonnegative long writeBatchSize) { Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353663839 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend( this.kvStateInformation = kvStateInformation; this.writeOptions = new WriteOptions().setDisableWAL(true); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); Review comment: I think we don't need to `checkArgument` when the passed by argument is annotated as `@Nonnegative` since this is already a contract. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353672881 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ## @@ -65,4 +69,52 @@ public void basicTest() throws Exception { } } } + + /** +* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory consumed exceeds the preconfigured value. +*/ + @Test + public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws Exception { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200, 50)) { + // sequence (8 bytes) + count (4 bytes) + // more information please ref to write_batch.cc in RocksDB + assertEquals(12, writeBatchWrapper.getDataSize()); Review comment: This assertion might fail if the rocksdb implementation changes and is unnecessary. We could simply record an `initialSize` for `writeBatchWrapper.getDataSize()` and check against it after flush is triggered. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.
wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375#discussion_r353673439 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java ## @@ -217,4 +217,14 @@ * @return self as ByteBuf implementation. */ ByteBuf asByteBuf(); + + /** +* @return whether the buffer is compressed or not. +*/ + boolean isCompressed(); + + /** +* Tags the buffer as compressed or uncompressed. +*/ + void setCompressed(boolean isCompressed); Review comment: We need the compression tag to identify if the buffer is compressed at downstream for the downstream may receive a mixture of compressed and uncompressed buffer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353673315 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ## @@ -65,4 +69,52 @@ public void basicTest() throws Exception { } } } + + /** +* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory consumed exceeds the preconfigured value. +*/ + @Test + public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws Exception { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200, 50)) { + // sequence (8 bytes) + count (4 bytes) + // more information please ref to write_batch.cc in RocksDB + assertEquals(12, writeBatchWrapper.getDataSize()); + byte[] dummy = new byte[6]; + ThreadLocalRandom.current().nextBytes(dummy); + // will add 1 + 1 + 1 + 6 + 1 + 6 = 16 bytes for each KV + // format is [handleType|kvType|keyLen|key|valueLen|value] + // more information please ref write_batch.cc in RocksDB + writeBatchWrapper.put(handle, dummy, dummy); + assertEquals(28, writeBatchWrapper.getDataSize()); + writeBatchWrapper.put(handle, dummy, dummy); + assertEquals(44, writeBatchWrapper.getDataSize()); + writeBatchWrapper.put(handle, dummy, dummy); + // will flush all, then an empty write batch + assertEquals(12, writeBatchWrapper.getDataSize()); + } + } + + /** +* Tests that {@link RocksDBWriteBatchWrapper} flushes after the kv count exceeds the preconfigured value. +*/ + @Test + public void testWriteBatchWrapperFlushAfterCountExceed() throws Exception { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 100, 5)) { + byte[] dummy = new byte[2]; + ThreadLocalRandom.current().nextBytes(dummy); + for (int i = 1; i < 100; ++i) { + writeBatchWrapper.put(handle, dummy, dummy); + // init 12 bytes, each kv consumes 8 bytes + assertEquals(12 + 8 * i, writeBatchWrapper.getDataSize()); Review comment: Ditto. Recording the init size of empty `writeBatchWrapper` instead using the hard-coded value `12` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353671065 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java ## @@ -125,6 +129,8 @@ public RocksDBIncrementalRestoreOperation( this.restoredSstFiles = new TreeMap<>(); this.lastCompletedCheckpointId = -1L; this.backendUID = UUID.randomUUID(); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative."); Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353669265 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -166,6 +169,11 @@ /** Whether we already lazily initialized our local storage directories. */ private transient boolean isInitialized; + /** +* Max consumed memory size for one batch in {@link RocksDBWriteBatchWrapper}, default value 2mb. +*/ + private long batchSize; Review comment: Suggest to change all "batchSize/BATCH_SIZE" to "writeBatchSize/WRITE_BATCH_SIZE" in this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353667120 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -414,7 +424,8 @@ private static void checkAndCreateDirectory(File directory) throws IOException { nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager); + ttlCompactFiltersManager, + writeBatchSize); Review comment: Can we use `this.writeBatchSize` directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353670772 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -113,4 +126,16 @@ public void close() throws RocksDBException { } IOUtils.closeQuietly(batch); } + + private void flushIfNeeded() throws RocksDBException { + boolean needFlush = batch.count() == capacity || (batchSize > 0 && batch.getDataSize() >= batchSize); Review comment: No need to check whether `batchSize` is larger than 0 since we already have the `@Nonnegative` annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator
StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator URL: https://github.com/apache/flink/pull/10405#issuecomment-561599666 Hmmm, cannot reproduce it any more. There seem to have been some errors in the log files, but apparently not a deterministic error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator
StephanEwen commented on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator URL: https://github.com/apache/flink/pull/10405#issuecomment-561599795 Will merge this... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen edited a comment on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator
StephanEwen edited a comment on issue #10405: [FLINK-15044][e2e tests] Clean up TpcdsResultComparator URL: https://github.com/apache/flink/pull/10405#issuecomment-561599666 Hmmm, cannot reproduce it any more. There seem to have been some errors in the log files, but apparently not a deterministic error. Must also be unrelated to this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services