[jira] [Commented] (FLINK-11499) Extend StreamingFileSink BulkFormats to support arbitrary roll policies
[ https://issues.apache.org/jira/browse/FLINK-11499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074322#comment-17074322 ] Piotr Nowojski commented on FLINK-11499: {quote} Have you consider using checkpoint stream? I think the checkpoint state backend is the closest storage for job. {quote} [~lzljs3620320]: Not directly, but I'm hoping the solution be general enough, that one could pass the same target FileSystem for the WAL stream as for checkpointing. Wouldn't that achieve the same result but on a lower level (writing to a file directly vs to state)? {quote} Some writers which come with buffer size/capacity criteria may get full even before the checkpoint is triggered. We have to think about this scenario as well, right? {quote} [~zenfenan]: maybe you are right. I was already thinking about it as I suspect it's even currently supported that bulk format file will roll in the middle of checkpoint? If that's the case, it's a matter of keeping this feature and it shouldn't be that difficult, as we should already have the code to support it. > Extend StreamingFileSink BulkFormats to support arbitrary roll policies > --- > > Key: FLINK-11499 > URL: https://issues.apache.org/jira/browse/FLINK-11499 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Seth Wiesman >Priority: Major > Labels: usability > Fix For: 1.11.0 > > > Currently when using the StreamingFilleSink Bulk-encoding formats can only be > combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress > part file on every checkpoint. > However, many bulk formats such as parquet are most efficient when written as > large files; this is not possible when frequent checkpointing is enabled. > Currently the only work-around is to have long checkpoint intervals which is > not ideal. > > The StreamingFileSink should be enhanced to support arbitrary roll policy's > so users may write large bulk files while retaining frequent checkpoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16945) Execute CheckpointFailureManager.FailJobCallback directly in main thread executor
[ https://issues.apache.org/jira/browse/FLINK-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-16945: -- Assignee: Biao Liu > Execute CheckpointFailureManager.FailJobCallback directly in main thread > executor > - > > Key: FLINK-16945 > URL: https://issues.apache.org/jira/browse/FLINK-16945 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Critical > Fix For: 1.11.0 > > > Since we have put all non-IO operations of {{CheckpointCoordinator}} into > main thread executor, the {{CheckpointFailureManager.FailJobCallback}} could > be executed directly now. In this way execution graph would fail immediately > when {{CheckpointFailureManager}} invokes the callback. We could avoid the > inconsistent scenario of FLINK-13497. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] Introduce file system table factory
flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] Introduce file system table factory URL: https://github.com/apache/flink/pull/11574#issuecomment-606453374 ## CI report: * 415f37b7a4c03d5591c9352f593eaecb23bc1cf3 UNKNOWN * ad9d0b168f308a5205430b2dcf85e680f20449b3 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157813487) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6994) * 756a68346b01c34e41fcb0b8d122e1a94207491e Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158140230) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7018) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516 ## CI report: * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010) * 39d9b1a9c461fc940a660b95a4764f9982ba1599 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16954) 1.9 branch was not compile
[ https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074317#comment-17074317 ] Chesnay Schepler commented on FLINK-16954: -- ping [~aljoscha] [~igal] > 1.9 branch was not compile > -- > > Key: FLINK-16954 > URL: https://issues.apache.org/jira/browse/FLINK-16954 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.3 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.3 > > > Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3. > {code:java} > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassReader > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassWriter > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] > cannot find symbol > 06:35:03.334 [ERROR] symbol: class ClassReader > 06:35:03.334 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16954) 1.9 branch was not compile
[ https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-16954: - Description: Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3. {code:java} 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] package jdk.internal.org.objectweb.asm does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] package jdk.internal.org.objectweb.asm does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] package jdk.internal.org.objectweb.asm.commons does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] package jdk.internal.org.objectweb.asm.commons does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] cannot find symbol 06:35:03.332 [ERROR] symbol: class ClassReader 06:35:03.332 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] cannot find symbol 06:35:03.332 [ERROR] symbol: class ClassWriter 06:35:03.332 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] cannot find symbol 06:35:03.334 [ERROR] symbol: class ClassReader 06:35:03.334 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper {code} was: Usage of JDK 11 APIs. {code:java} 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] package jdk.internal.org.objectweb.asm does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] package jdk.internal.org.objectweb.asm does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] package jdk.internal.org.objectweb.asm.commons does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] package jdk.internal.org.objectweb.asm.commons does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] cannot find symbol 06:35:03.332 [ERROR] symbol: class ClassReader 06:35:03.332 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] cannot find symbol 06:35:03.332 [ERROR] symbol: class ClassWriter 06:35:03.332 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] cannot find symbol 06:35:03.334 [ERROR] symbol: class ClassReader 06:35:03.334 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper {code} > 1.9 branch was not compile > -- > > Key: FLINK-16954 > URL: https://issues.apache.org/jira/browse/FLINK-16954 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.3 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.3 > > > Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3. > {code:java} > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/tra
[jira] [Updated] (FLINK-16954) 1.9 branch does not compile
[ https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-16954: - Summary: 1.9 branch does not compile (was: 1.9 branch was not compile) > 1.9 branch does not compile > --- > > Key: FLINK-16954 > URL: https://issues.apache.org/jira/browse/FLINK-16954 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.3 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.3 > > > Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3. > {code:java} > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassReader > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassWriter > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] > cannot find symbol > 06:35:03.334 [ERROR] symbol: class ClassReader > 06:35:03.334 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16954) ClassRelocator uses JDK 11 APIs
[ https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-16954: - Summary: ClassRelocator uses JDK 11 APIs (was: 1.9 branch does not compile) > ClassRelocator uses JDK 11 APIs > --- > > Key: FLINK-16954 > URL: https://issues.apache.org/jira/browse/FLINK-16954 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.3 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.3 > > > Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3. > {code:java} > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassReader > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassWriter > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] > cannot find symbol > 06:35:03.334 [ERROR] symbol: class ClassReader > 06:35:03.334 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16954) 1.9 branch was not compile
Chesnay Schepler created FLINK-16954: Summary: 1.9 branch was not compile Key: FLINK-16954 URL: https://issues.apache.org/jira/browse/FLINK-16954 Project: Flink Issue Type: Improvement Components: API / Type Serialization System Affects Versions: 1.9.3 Reporter: Chesnay Schepler Fix For: 1.9.3 Usage of JDK 11 APIs. {code:java} 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] package jdk.internal.org.objectweb.asm does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] package jdk.internal.org.objectweb.asm does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] package jdk.internal.org.objectweb.asm.commons does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] package jdk.internal.org.objectweb.asm.commons does not exist 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] cannot find symbol 06:35:03.332 [ERROR] symbol: class ClassReader 06:35:03.332 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] cannot find symbol 06:35:03.332 [ERROR] symbol: class ClassWriter 06:35:03.332 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper 06:35:03.332 [ERROR] /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] cannot find symbol 06:35:03.334 [ERROR] symbol: class ClassReader 06:35:03.334 [ERROR] location: class org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16954) 1.9 branch was not compile
[ https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-16954: - Issue Type: Bug (was: Improvement) > 1.9 branch was not compile > -- > > Key: FLINK-16954 > URL: https://issues.apache.org/jira/browse/FLINK-16954 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.3 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.3 > > > Usage of JDK 11 APIs. > {code:java} > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38] > package jdk.internal.org.objectweb.asm does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46] > package jdk.internal.org.objectweb.asm.commons does not exist > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassReader > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32] > cannot find symbol > 06:35:03.332 [ERROR] symbol: class ClassWriter > 06:35:03.332 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > 06:35:03.332 [ERROR] > /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32] > cannot find symbol > 06:35:03.334 [ERROR] symbol: class ClassReader > 06:35:03.334 [ERROR] location: class > org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol commented on a change in pull request #11583: [FLINK-16883] Scan FLINK_CONF directory for available log4j configura…
zentol commented on a change in pull request #11583: [FLINK-16883] Scan FLINK_CONF directory for available log4j configura… URL: https://github.com/apache/flink/pull/11583#discussion_r402767124 ## File path: flink-dist/src/main/flink-bin/bin/flink-console.sh ## @@ -58,7 +58,9 @@ esac FLINK_TM_CLASSPATH=`constructFlinkClassPath` -log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") +log4j_config=`findLog4jConfiguration ${FLINK_CONF_DIR}` + +log_setting=("-Dlog4j.configuration=file:${log4j_config}" "-Dlog4j.configurationFile=file:${log4j_config}" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") Review comment: The goal was to make the scripts less strict when it comes to the file type (.properties vs .xml). This is more of a concern for log4j since we use a properties file, whereas xml appears to be the "preferred" way with log4j2. I haven't heard similar problems existing with logback, but yes, ultimately it could be done for both frameworks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp
[ https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074311#comment-17074311 ] Jark Wu commented on FLINK-16938: - Hi [~liuyufei], I think I get what's your problem. You are using {{System.currentTimeMillis()}} as the data of rowtime field. However, Flink recognize millisecond as UTC time zone, not local zone. If you want to express it in +8:00, you should plus the time offset. > SqlTimestamp has lag when convert long to Timestamp > --- > > Key: FLINK-16938 > URL: https://issues.apache.org/jira/browse/FLINK-16938 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > > When I set rowtime attribute by using expression 'column.rowtime' , and > result type is sql.Timestamp, the result will have lag which is equals with > default timezone offset. > {code:java} > tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data"); > {code} > I look into the conversion logic, the field was go through 'long -> > SqlTimestamp -> Timestamp' conversion. > {code:java} > long from = System.currentTimeMillis(); > long to = SqlTimestamp > .fromEpochMillis(from) > .toTimestamp() > .getTime(); > {code} > The result is {{from!=to}}. In {{SqlTimestamp.toTimestamp()}} using > {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone > infomation, will casue time lag. > From Timestamp to Timestamp not have this issue, but convert Datastream to > Table is use StreamRecord.timestamp as rowtime field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol commented on a change in pull request #11583: [FLINK-16883] Scan FLINK_CONF directory for available log4j configura…
zentol commented on a change in pull request #11583: [FLINK-16883] Scan FLINK_CONF directory for available log4j configura… URL: https://github.com/apache/flink/pull/11583#discussion_r402766083 ## File path: flink-dist/src/main/flink-bin/bin/flink-console.sh ## @@ -58,7 +58,9 @@ esac FLINK_TM_CLASSPATH=`constructFlinkClassPath` -log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") +log4j_config=`findLog4jConfiguration ${FLINK_CONF_DIR}` Review comment: The knowledge of it being in Flink and "dedicated" to Flink could be useful in the future; e.g., we could think about it being modified at runtime via the WebUI to change the log level. You couldn't do this if the log file is at an arbitrary location (for security reasons). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF
dianfu commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402765167 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public boolean isNullAt(int pos) { + return data.isNullAt(offset + pos); + } + + @Override + public void setNullAt(int pos) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean getBoolean(int ordinal) { + return ((BooleanColumnVector) data).getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return ((ByteColumnVector) data).getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return ((ShortColumnVector) data).getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return ((IntColumnVector) data).getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return ((LongColumnVector) data).getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return ((FloatColumnVector) data).getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return ((DoubleColumnVector) data).getDouble(offset + ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) {
[GitHub] [flink] flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable
flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable URL: https://github.com/apache/flink/pull/11619#issuecomment-607793023 ## CI report: * 6bd37e40b9333d646bbf1bb1fbe286ad21d1e709 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158126927) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7013) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] Introduce file system table factory
flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] Introduce file system table factory URL: https://github.com/apache/flink/pull/11574#issuecomment-606453374 ## CI report: * 415f37b7a4c03d5591c9352f593eaecb23bc1cf3 UNKNOWN * ad9d0b168f308a5205430b2dcf85e680f20449b3 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157813487) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6994) * 756a68346b01c34e41fcb0b8d122e1a94207491e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-16942) ES 5 sink should allows users to select netty transport client
[ https://issues.apache.org/jira/browse/FLINK-16942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-16942. Resolution: Fixed master: 0629d169358e1b8e89545a0977f8995d298db398 1.10: 16b2ea41b094026ff7da13c3ce34a804c25aaffb 1.9: 2d29e3ef807efc7dbe819f59a390c3d090a43654 > ES 5 sink should allows users to select netty transport client > -- > > Key: FLINK-16942 > URL: https://issues.apache.org/jira/browse/FLINK-16942 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Affects Versions: 1.9.0, 1.10.0, 1.11.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.3, 1.10.1, 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > When assembling the settings for the {{PreBuiltTransportClient}} the > {{Elasticsearch5ApiCallBridge}} first adds the user-provided client and then > overrides http/transport types with netty 3. > This means that users are forced to use netty 3, despite the connector being > able to work with a more recent and secure version. > {code:java} > Settings settings = Settings.builder().put(clientConfig) > .put(NetworkModule.HTTP_TYPE_KEY, > Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) > .put(NetworkModule.TRANSPORT_TYPE_KEY, > Netty3Plugin.NETTY_TRANSPORT_NAME) > .build(); > {code} > We should invert the order in which the settings are added. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2
[ https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-16389: - Fix Version/s: 1.10.1 1.9.3 > Bump Kafka 0.10 to 0.10.2.2 > --- > > Key: FLINK-16389 > URL: https://issues.apache.org/jira/browse/FLINK-16389 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.3, 1.10.1, 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory
leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory URL: https://github.com/apache/flink/pull/11574#discussion_r402761566 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; + +/** + * File system {@link TableFactory}. + * + * 1.The partition information should be in the file system path, whether it's a temporary + * table or a catalog table. + * 2.Support insert into (append) and insert overwrite. + * 3.Support static and dynamic partition inserting. + * + * Migrate to new source/sink interface after FLIP-95 is ready. + */ +public class FileSystemTableFactory implements + TableSourceFactory, + TableSinkFactory { + + public static final String CONNECTOR_VALUE = "filesystem"; + + /** +* Not use "connector.path" because: +* 1.Using "connector.path" will conflict with current batch csv source and batch csv sink. +* 2.This is compatible with FLIP-122. +*/ + public static final String PATH = "path"; + + /** +* Move these properties to validator after FLINK-16904. +*/ + public static final ConfigOption PARTITION_DEFAULT_NAME = key("partition.default-name") + .stringType() + .defaultValue("__DEFAULT_PARTITION__") + .withDescription("The default partition name in case the dynamic partition" + + " column value is null/empty string"); + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR, CONNECTOR_VALUE); + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // path + properties.add(PATH); + + // schema + properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE); + properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME); + + properties.add(PARTITION_DEFAULT_NAME.key()); + + // format + properties.add(FORMAT); + properties.add(FORMAT + ".*"); + + return properties; + } + + @Override + public TableSource createTableSource(TableSourceFactory.Context context) { + DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(context.getTable().getProperties()); + + return new FileSystemTableSource( + context.getTable().getSchema(), + new Path(properties.getString(PATH)), + context.getTable().getPartitionKeys(), + getPartitionDefaultName(properties), + getFormatProperties(context.getTable().getProperties())); + } + +
[jira] [Comment Edited] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2
[ https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074302#comment-17074302 ] Chesnay Schepler edited comment on FLINK-16389 at 4/3/20, 6:27 AM: --- master: 70a320b010910bf01ec342afe8371dbbf04c7ec0 1.10: 0be5d107a80c22f5761e2770a7a50c226adc8180 1.9: af79f680edcc30906955e83eee04b39981a837f7 was (Author: zentol): master: 70a320b010910bf01ec342afe8371dbbf04c7ec0 > Bump Kafka 0.10 to 0.10.2.2 > --- > > Key: FLINK-16389 > URL: https://issues.apache.org/jira/browse/FLINK-16389 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.3, 1.10.1, 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol merged pull request #11616: [FLINK-16942][es] Allow users to override http/transport type
zentol merged pull request #11616: [FLINK-16942][es] Allow users to override http/transport type URL: https://github.com/apache/flink/pull/11616 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer
leonardBang commented on a change in pull request #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer URL: https://github.com/apache/flink/pull/11602#discussion_r402758054 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.row; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.ParquetWriterFactory; +import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader; +import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.stream.IntStream; + +/** + * Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. + */ +public class ParquetRowDataWriterTest { + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final RowType ROW_TYPE = RowType.of( + new VarCharType(VarCharType.MAX_LENGTH), + new VarBinaryType(VarBinaryType.MAX_LENGTH), + new BooleanType(), + new TinyIntType(), + new SmallIntType(), + new IntType(), + new BigIntType(), + new FloatType(), + new DoubleType(), + new TimestampType(9), + new DecimalType(5, 0), + new DecimalType(15, 0), + new DecimalType(20, 0)); + + @SuppressWarnings("unchecked") + private static final DataFormatConverters.DataFormatConverter CONVERTER = + DataFormatConverters.getConverterForDataType( + TypeConversions.fromLogicalToDataType(ROW_TYPE)); + + @Test + public void testTypes() throws IOException { + Configuration conf = new Configuration(); + innerTest(conf, true); + innerTest(conf, false); + } + + @Test + public void testCompression() throws IOException { + Configuration conf = new Configuration(); + conf.set(ParquetOutputFormat.COMPRESSION, "GZIP"); + innerTest(conf, true); + innerTest(conf, false); + } + + private void innerTest( + Configuration conf, + boolean utcTimestamp) throws IOException { + Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUU
[GitHub] [flink] leonardBang commented on a change in pull request #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer
leonardBang commented on a change in pull request #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer URL: https://github.com/apache/flink/pull/11602#discussion_r402756699 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.row; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.dataformat.SqlTimestamp; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Timestamp; +import java.util.Arrays; + +import static org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.computeMinBytesForDecimalPrecision; +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS; +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY; +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND; +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND; + +/** + * Writes a record to the Parquet API with the expected schema in order to be written to a file. + */ +public class ParquetRowDataWriter { + + private final RecordConsumer recordConsumer; + private final boolean utcTimestamp; + + private final FieldWriter[] filedWriters; + private final String[] fieldNames; + + public ParquetRowDataWriter( + RecordConsumer recordConsumer, + RowType rowType, + GroupType schema, + boolean utcTimestamp) { + this.recordConsumer = recordConsumer; + this.utcTimestamp = utcTimestamp; + + this.filedWriters = new FieldWriter[rowType.getFieldCount()]; + this.fieldNames = rowType.getFieldNames().toArray(new String[0]); + for (int i = 0; i < rowType.getFieldCount(); i++) { + this.filedWriters[i] = createWriter(rowType.getTypeAt(i), schema.getType(i)); + } + } + + /** +* It writes a record to Parquet. +* +* @param record Contains the record that is going to be written. +*/ + public void write(final BaseRow record) { + recordConsumer.startMessage(); + for (int i = 0; i < filedWriters.length; i++) { + if (!record.isNullAt(i)) { + String fieldName = fieldNames[i]; + FieldWriter writer = filedWriters[i]; + + recordConsumer.startField(fieldName, i); + writer.write(record, i); + recordConsumer.endField(fieldName, i); + } + } + recordConsumer.endMessage(); + } + + private FieldWriter createWriter(LogicalType t, Type type) { + if (type.isPrimitive()) { + switch (t.getTypeRoot()) { + case CHAR: + case VARCHAR: + return new StringWriter(); + case BOOLEAN: + return new BooleanWriter(); + case BINARY: + case VARBINARY: + return new BinaryW
[jira] [Updated] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2
[ https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-16389: - Affects Version/s: 1.9.0 > Bump Kafka 0.10 to 0.10.2.2 > --- > > Key: FLINK-16389 > URL: https://issues.apache.org/jira/browse/FLINK-16389 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2
[ https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-16389. Resolution: Fixed master: 70a320b010910bf01ec342afe8371dbbf04c7ec0 > Bump Kafka 0.10 to 0.10.2.2 > --- > > Key: FLINK-16389 > URL: https://issues.apache.org/jira/browse/FLINK-16389 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zentol merged pull request #11617: [FLINK-16389][kafka] Bump kafka version to 0.10.2.2
zentol merged pull request #11617: [FLINK-16389][kafka] Bump kafka version to 0.10.2.2 URL: https://github.com/apache/flink/pull/11617 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402760160 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public boolean isNullAt(int pos) { + return data.isNullAt(offset + pos); + } + + @Override + public void setNullAt(int pos) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean getBoolean(int ordinal) { + return ((BooleanColumnVector) data).getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return ((ByteColumnVector) data).getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return ((ShortColumnVector) data).getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return ((IntColumnVector) data).getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return ((LongColumnVector) data).getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return ((FloatColumnVector) data).getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return ((DoubleColumnVector) data).getDouble(offset + ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale
[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#discussion_r402757385 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ## @@ -61,158 +61,180 @@ private final String clusterId; private final String nameSpace; - public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client) { + private final ExecutorWrapper executorWrapper; + + public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client, ExecutorWrapper executorWrapper) { this.flinkConfig = checkNotNull(flinkConfig); this.internalClient = checkNotNull(client); this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)); this.nameSpace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); + + this.executorWrapper = executorWrapper; } @Override - public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { + public CompletableFuture createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { final Deployment deployment = kubernetesJMSpec.getDeployment(); final List accompanyingResources = kubernetesJMSpec.getAccompanyingResources(); // create Deployment LOG.debug("Start to create deployment with spec {}", deployment.getSpec().toString()); - final Deployment createdDeployment = this.internalClient - .apps() - .deployments() - .inNamespace(this.nameSpace) - .create(deployment); - - // Note that we should use the uid of the created Deployment for the OwnerReference. - setOwnerReference(createdDeployment, accompanyingResources); - this.internalClient - .resourceList(accompanyingResources) - .inNamespace(this.nameSpace) - .createOrReplace(); + return CompletableFuture.runAsync(() -> { + final Deployment createdDeployment = this.internalClient + .apps() + .deployments() + .inNamespace(this.nameSpace) + .create(deployment); + + // Note that we should use the uid of the created Deployment for the OwnerReference. + setOwnerReference(createdDeployment, accompanyingResources); + + this.internalClient + .resourceList(accompanyingResources) + .inNamespace(this.nameSpace) + .createOrReplace(); + }, executorWrapper.getExecutor()); } @Override public void createTaskManagerPod(KubernetesPod kubernetesPod) { - final Deployment masterDeployment = this.internalClient - .apps() - .deployments() - .inNamespace(this.nameSpace) - .withName(KubernetesUtils.getDeploymentName(clusterId)) - .get(); - - if (masterDeployment == null) { - throw new RuntimeException( - "Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace); - } + CompletableFuture.runAsync(() -> { + final Deployment masterDeployment = this.internalClient + .apps() + .deployments() + .inNamespace(this.nameSpace) + .withName(KubernetesUtils.getDeploymentName(clusterId)) + .get(); + + if (masterDeployment == null) { + throw new RuntimeException( + "Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace); + } - // Note that we should use the uid of the master Deployment for the OwnerReference. - setOwnerReference(masterDeployment, Collections.singletonList(kubernetesPod.getInternalResource())); + // Note that we should use the uid of the master Deployment for the OwnerReference. + setOwnerReference(masterDeployment, Collections.singletonList(kubernetesPod.getInternalResource())); - LOG.debug("Start to create pod with metadata {}, spec {}", - kubern
[GitHub] [flink] JingsongLi commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory
JingsongLi commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory URL: https://github.com/apache/flink/pull/11574#discussion_r402753698 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; + +/** + * File system {@link TableFactory}. + * + * 1.The partition information should be in the file system path, whether it's a temporary + * table or a catalog table. + * 2.Support insert into (append) and insert overwrite. + * 3.Support static and dynamic partition inserting. + * + * Migrate to new source/sink interface after FLIP-95 is ready. + */ +public class FileSystemTableFactory implements + TableSourceFactory, + TableSinkFactory { + + public static final String CONNECTOR_VALUE = "filesystem"; + + /** +* Not use "connector.path" because: +* 1.Using "connector.path" will conflict with current batch csv source and batch csv sink. +* 2.This is compatible with FLIP-122. +*/ + public static final String PATH = "path"; + + /** +* Move these properties to validator after FLINK-16904. +*/ + public static final ConfigOption PARTITION_DEFAULT_NAME = key("partition.default-name") + .stringType() + .defaultValue("__DEFAULT_PARTITION__") + .withDescription("The default partition name in case the dynamic partition" + + " column value is null/empty string"); + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR, CONNECTOR_VALUE); + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // path + properties.add(PATH); + + // schema + properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE); + properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME); + + properties.add(PARTITION_DEFAULT_NAME.key()); + + // format + properties.add(FORMAT); + properties.add(FORMAT + ".*"); + + return properties; + } + + @Override + public TableSource createTableSource(TableSourceFactory.Context context) { + DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(context.getTable().getProperties()); + + return new FileSystemTableSource( + context.getTable().getSchema(), + new Path(properties.getString(PATH)), + context.getTable().getPartitionKeys(), + getPartitionDefaultName(properties), + getFormatProperties(context.getTable().getProperties())); + } + +
[GitHub] [flink] carp84 commented on issue #11555: [FLINK-16576][state backends] Correct the logic of KeyGroupStateHandle#getIntersection
carp84 commented on issue #11555: [FLINK-16576][state backends] Correct the logic of KeyGroupStateHandle#getIntersection URL: https://github.com/apache/flink/pull/11555#issuecomment-608245577 @StephanEwen could you also take a look here? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16953) TableEnvHiveConnectorTest is unstable on travis.
[ https://issues.apache.org/jira/browse/FLINK-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-16953: - Description: [https://api.travis-ci.org/v3/job/670405441/log.txt] {code:java} [INFO] Running org.apache.flink.connectors.hive.TableEnvHiveConnectorTest [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 12.67 s <<< FAILURE! - in org.apache.flink.connectors.hive.TableEnvHiveConnectorTest [ERROR] org.apache.flink.connectors.hive.TableEnvHiveConnectorTest Time elapsed: 12.669 s <<< ERROR! java.lang.IllegalStateException: Failed to create HiveServer :Failed to get metastore connection Caused by: java.lang.RuntimeException: Failed to get metastore connection Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient Caused by: java.lang.reflect.InvocationTargetException Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused) at org.apache.thrift.transport.TSocket.open(TSocket.java:226) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:480) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:247) at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:70) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1706) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:83) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231) at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:388) at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332) at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312) at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288) at org.apache.hive.service.server.HiveServer2.init(HiveServer2.java:166) at com.klarna.hiverunner.HiveServerContainer.init(HiveServerContainer.java:84) at com.klarna.hiverunner.builder.HiveShellBase.start(HiveShellBase.java:165) at org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.createHiveServerContainer(FlinkStandaloneHiveRunner.java:217) at org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.access$600(FlinkStandaloneHiveRunner.java:92) at org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner$2.before(FlinkStandaloneHiveRunner.java:131) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.s
[jira] [Created] (FLINK-16953) TableEnvHiveConnectorTest is unstable on travis.
Xintong Song created FLINK-16953: Summary: TableEnvHiveConnectorTest is unstable on travis. Key: FLINK-16953 URL: https://issues.apache.org/jira/browse/FLINK-16953 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.11.0 Reporter: Xintong Song [https://api.travis-ci.org/v3/job/670405441/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402745707 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -793,16 +799,26 @@ private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ return Optional.empty(); } - private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) { + private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile resourceProfile) { for (TaskManagerSlot slot : slots.values()) { if (slot.getResourceProfile().isMatching(resourceProfile)) { return true; } } + + for (PendingTaskManagerSlot slot : pendingSlots.values()) { + if (slot.getResourceProfile().isMatching(resourceProfile)) { + return true; + } + } + return false; } private Optional allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + if (getNumberPendingTaskManagerSlots() + getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) { + return Optional.empty(); + } Review comment: In addition, we should add warning logs explicitly show that slots are not allocated because of reaching the max limit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402744613 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -793,16 +799,26 @@ private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ return Optional.empty(); } - private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) { + private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile resourceProfile) { for (TaskManagerSlot slot : slots.values()) { if (slot.getResourceProfile().isMatching(resourceProfile)) { return true; } } + + for (PendingTaskManagerSlot slot : pendingSlots.values()) { + if (slot.getResourceProfile().isMatching(resourceProfile)) { + return true; + } + } + return false; } private Optional allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + if (getNumberPendingTaskManagerSlots() + getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) { + return Optional.empty(); + } Review comment: I think this makes the SM/RM contract a bit unclear. Before this PR, SM requests individual slots from RM and is not aware of `numSlotsPerWorker`. With this change, SM is aware of slots per worker only for the max slots limit check, while for other logics (e.g., generating pending slots) it still pretend not aware of slots per worker. I would suggest to either move this check to the RM side, or base this PR on #11320, where we change the contract that SM request workers from RM and is aware of slots per worker in a consistent way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402719726 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -131,13 +134,15 @@ public SlotManagerImpl( Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout, + int maxSlotNum, boolean waitResultConsumedBeforeRelease) { this.slotMatchingStrategy = Preconditions.checkNotNull(slotMatchingStrategy); this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + this.maxSlotNum = maxSlotNum; Review comment: We should have a sanity check here to make sure `maxSlotNum > 0`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402736479 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java ## @@ -1293,7 +1293,18 @@ private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfi } private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { - SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().build(); + return createSlotManager(resourceManagerId, resourceManagerActions, 1); + } + + private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker) { + return createSlotManager(resourceManagerId, resourceManagerActions, numSlotsPerWorker, Integer.MAX_VALUE); + } + + private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker, int maxSlotNum) { Review comment: I think we should try to avoid such deep nested methods if possible. Since both `numSlotsPerWorker` and `maxSlotNum` can be read from a `Configuration`, I think keeping only two `createSlotManager` methods (one with `Configuration` and one without) should be enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402718103 ## File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java ## @@ -69,4 +69,11 @@ .text("Enable the slot spread out allocation strategy. This strategy tries to spread out " + "the slots evenly across all available %s.", code("TaskExecutors")) .build()); + + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption MAX_SLOT_NUM = ConfigOptions + .key("cluster.number-of-slots.max") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("Defines the max limitation of the total number of slots."); Review comment: I think we should add this to `ResourceManagerOptions` rather than `ClusterOptions`, with key prefix `slotmanager.*`. The option is only used in `SlotManager`, and once FLINK-14106 is finished this might even become a plugin-specific config option. And I would also mention in the description that this is meant for batch scenarios and might cause problems on streaming jobs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402740615 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -375,6 +375,12 @@ public void registerTaskManager(final TaskExecutorConnection taskExecutorConnect if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); } else { + if (getNumberRegisteredSlots() + Math.max(getNumberPendingTaskManagerSlots(), numSlotsPerWorker) > maxSlotNum) { + LOG.warn("The total number of slots exceeds the max limitation, release the excess resource."); + resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation.")); + return; + } Review comment: Not sure about checking on task executors' registering. I think the max slot limit should not take effect for standalone clusters, because the number of task executors launched is not controlled by RM. We should also mention that in the config option description. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#discussion_r402719311 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java ## @@ -37,6 +38,7 @@ private SlotManagerBuilder() { this.taskManagerRequestTimeout = TestingUtils.infiniteTime(); this.slotRequestTimeout = TestingUtils.infiniteTime(); this.taskManagerTimeout = TestingUtils.infiniteTime(); + this.maxSlotNum = Integer.MAX_VALUE; Review comment: nit: maybe `MAX_SLOT_NUM.defaultValue()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory
leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory URL: https://github.com/apache/flink/pull/11574#discussion_r402748927 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.OverwritableTableSink; +import org.apache.flink.table.sinks.PartitionableTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory; + +/** + * File system {@link TableSink}. + */ +public class FileSystemTableSink implements + AppendStreamTableSink, + PartitionableTableSink, + OverwritableTableSink { + + private final TableSchema schema; + private final List partitionKeys; + private final Path path; + private final String defaultPartName; + private final Map formatProperties; + + private boolean overwrite = false; + private boolean dynamicGrouping = false; + private LinkedHashMap staticPartitions = new LinkedHashMap<>(); + + /** +* Construct a file system table sink. +* +* @param schema schema of the table. +* @param path directory path of the file system table. +* @param partitionKeys partition keys of the table. +* @param defaultPartName The default partition name in case the dynamic partition column value +*is null/empty string. +* @param formatProperties format properties. +*/ + public FileSystemTableSink( + TableSchema schema, + Path path, + List partitionKeys, + String defaultPartName, + Map formatProperties) { + this.schema = schema; + this.path = path; + this.defaultPartName = defaultPartName; + this.formatProperties = formatProperties; + this.partitionKeys = partitionKeys; + } + + @Override + public final DataStreamSink consumeDataStream(DataStream dataStream) { + RowDataPartitionComputer computer = new RowDataPartitionComputer( + defaultPartName, + schema.getFieldNames(), + schema.getFieldDataTypes(), + partitionKeys.toArray(new String[0])); + + FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder<>(); + builder.setPartitionComputer(computer); + builder.setDynamicGrouped(dynamicGrouping); + builder.setPartitionColumns(partitionKeys.toArray(new String[0])); + builder.setFormatFactory(createOutputFormatFactory()); + builder.setMetaStoreFactory(createTableMetaStoreFactory(path)); + builder.setOverwrite(overwrite); + builde
[GitHub] [flink] leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory
leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory URL: https://github.com/apache/flink/pull/11574#discussion_r402748612 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; + +/** + * File system {@link TableFactory}. + * + * 1.The partition information should be in the file system path, whether it's a temporary + * table or a catalog table. + * 2.Support insert into (append) and insert overwrite. + * 3.Support static and dynamic partition inserting. + * + * Migrate to new source/sink interface after FLIP-95 is ready. + */ +public class FileSystemTableFactory implements + TableSourceFactory, + TableSinkFactory { + + public static final String CONNECTOR_VALUE = "filesystem"; + + /** +* Not use "connector.path" because: +* 1.Using "connector.path" will conflict with current batch csv source and batch csv sink. +* 2.This is compatible with FLIP-122. +*/ + public static final String PATH = "path"; + + /** +* Move these properties to validator after FLINK-16904. +*/ + public static final ConfigOption PARTITION_DEFAULT_NAME = key("partition.default-name") + .stringType() + .defaultValue("__DEFAULT_PARTITION__") + .withDescription("The default partition name in case the dynamic partition" + + " column value is null/empty string"); + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR, CONNECTOR_VALUE); + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // path + properties.add(PATH); + + // schema + properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE); + properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME); + + properties.add(PARTITION_DEFAULT_NAME.key()); + + // format + properties.add(FORMAT); + properties.add(FORMAT + ".*"); + + return properties; + } + + @Override + public TableSource createTableSource(TableSourceFactory.Context context) { + DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(context.getTable().getProperties()); + + return new FileSystemTableSource( + context.getTable().getSchema(), + new Path(properties.getString(PATH)), + context.getTable().getPartitionKeys(), + getPartitionDefaultName(properties), + getFormatProperties(context.getTable().getProperties())); + } + +
[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#discussion_r402749513 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ## @@ -61,158 +61,180 @@ private final String clusterId; private final String nameSpace; - public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client) { + private final ExecutorWrapper executorWrapper; + + public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client, ExecutorWrapper executorWrapper) { this.flinkConfig = checkNotNull(flinkConfig); this.internalClient = checkNotNull(client); this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)); this.nameSpace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); + + this.executorWrapper = executorWrapper; } @Override - public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { + public CompletableFuture createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { final Deployment deployment = kubernetesJMSpec.getDeployment(); final List accompanyingResources = kubernetesJMSpec.getAccompanyingResources(); // create Deployment LOG.debug("Start to create deployment with spec {}", deployment.getSpec().toString()); - final Deployment createdDeployment = this.internalClient - .apps() - .deployments() - .inNamespace(this.nameSpace) - .create(deployment); - - // Note that we should use the uid of the created Deployment for the OwnerReference. - setOwnerReference(createdDeployment, accompanyingResources); - this.internalClient - .resourceList(accompanyingResources) - .inNamespace(this.nameSpace) - .createOrReplace(); + return CompletableFuture.runAsync(() -> { + final Deployment createdDeployment = this.internalClient + .apps() + .deployments() + .inNamespace(this.nameSpace) + .create(deployment); + + // Note that we should use the uid of the created Deployment for the OwnerReference. + setOwnerReference(createdDeployment, accompanyingResources); + + this.internalClient + .resourceList(accompanyingResources) + .inNamespace(this.nameSpace) + .createOrReplace(); + }, executorWrapper.getExecutor()); } @Override public void createTaskManagerPod(KubernetesPod kubernetesPod) { - final Deployment masterDeployment = this.internalClient - .apps() - .deployments() - .inNamespace(this.nameSpace) - .withName(KubernetesUtils.getDeploymentName(clusterId)) - .get(); - - if (masterDeployment == null) { - throw new RuntimeException( - "Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace); - } + CompletableFuture.runAsync(() -> { Review comment: Good point. I will add a `retry` logic here. When all the retry times(3) is exhausted, we will throw the exception here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402746198 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public boolean isNullAt(int pos) { + return data.isNullAt(offset + pos); + } + + @Override + public void setNullAt(int pos) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean getBoolean(int ordinal) { + return ((BooleanColumnVector) data).getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return ((ByteColumnVector) data).getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return ((ShortColumnVector) data).getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return ((IntColumnVector) data).getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return ((LongColumnVector) data).getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return ((FloatColumnVector) data).getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return ((DoubleColumnVector) data).getDouble(offset + ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale
[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402747024 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; Review comment: length -> numElements? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402746856 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public boolean isNullAt(int pos) { + return data.isNullAt(offset + pos); + } + + @Override + public void setNullAt(int pos) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean getBoolean(int ordinal) { + return ((BooleanColumnVector) data).getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return ((ByteColumnVector) data).getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return ((ShortColumnVector) data).getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return ((IntColumnVector) data).getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return ((LongColumnVector) data).getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return ((FloatColumnVector) data).getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return ((DoubleColumnVector) data).getDouble(offset + ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale
[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF URL: https://github.com/apache/flink/pull/11598#discussion_r402746022 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.ArrayColumnVector; +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; +import org.apache.flink.table.dataformat.vector.ByteColumnVector; +import org.apache.flink.table.dataformat.vector.BytesColumnVector; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.dataformat.vector.DecimalColumnVector; +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; +import org.apache.flink.table.dataformat.vector.FloatColumnVector; +import org.apache.flink.table.dataformat.vector.IntColumnVector; +import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.dataformat.vector.ShortColumnVector; +import org.apache.flink.table.dataformat.vector.TimestampColumnVector; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TinyIntType; + +/** + * Columnar array to support access to vector column data. + */ +public final class ColumnarArray implements BaseArray { + + private final DataType elementType; + private final ColumnVector data; + private final int offset; + private final int length; + + public ColumnarArray(DataType elementType, ColumnVector data, int offset, int length) { + this.elementType = elementType; + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public boolean isNullAt(int pos) { + return data.isNullAt(offset + pos); + } + + @Override + public void setNullAt(int pos) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean getBoolean(int ordinal) { + return ((BooleanColumnVector) data).getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return ((ByteColumnVector) data).getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return ((ShortColumnVector) data).getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return ((IntColumnVector) data).getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return ((LongColumnVector) data).getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return ((FloatColumnVector) data).getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return ((DoubleColumnVector) data).getDouble(offset + ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + BytesColumnVector.Bytes byteArray = getByteArray(ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale
[jira] [Updated] (FLINK-16952) Parquet file system format support filter pushdown
[ https://issues.apache.org/jira/browse/FLINK-16952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-16952: - Description: We can create the conversion between Flink Expression(NOTE: should be new Expression instead of PlannerExpression) and parquet FilterPredicate. And apply to Parquet file system format. was: We can extract the conversion between Flink Expression and parquet FilterPredicate. And apply to Parquet file system format. > Parquet file system format support filter pushdown > --- > > Key: FLINK-16952 > URL: https://issues.apache.org/jira/browse/FLINK-16952 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: 1.11.0 > > > We can create the conversion between Flink Expression(NOTE: should be new > Expression instead of PlannerExpression) and parquet FilterPredicate. > And apply to Parquet file system format. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16952) Parquet file system format support filter pushdown
Jingsong Lee created FLINK-16952: Summary: Parquet file system format support filter pushdown Key: FLINK-16952 URL: https://issues.apache.org/jira/browse/FLINK-16952 Project: Flink Issue Type: Sub-task Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: 1.11.0 We can extract the conversion between Flink Expression and parquet FilterPredicate. And apply to Parquet file system format. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433 ## CI report: * 2670a045bd9c9c4c75e7a22bf188fc673c85a062 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158132906) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7017) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813 ## CI report: * d8814c3aed36cea34b5069b2a270df8033d29956 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158126859) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7012) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084 ## CI report: * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015) * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16887) Refactor retraction rules to support inferring ChangelogMode
[ https://issues.apache.org/jira/browse/FLINK-16887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-16887: Description: Current retraction machanism only support 2 message kinds (+ and -). However, since FLIP-95, we will introduce more message kinds to users (insert/delete/update_before/update_after). In order to support that, we should first refactor current retraction rules to support ChangelogMode inference. In previous, every node will be attached with a AccMode trait after retraction rule. In the proposed design, we will infer ChangelogMode trait for every node. Design documentation: https://docs.google.com/document/d/1n_iXIQsKT3uiBqENR8j8RdjRhZfzMhhB66QZvx2rFjE/edit?ts=5e8419c1# was: Current retraction machanism only support 2 message kinds (+ and -). However, since FLIP-95, we will introduce more message kinds to users (insert/delete/update_before/update_after). In order to support that, we should first refactor current retraction rules to support ChangelogMode inference. In previous, every node will be attached with a AccMode trait after retraction rule. In the proposed design, we will infer ChangelogMode trait for every node. A detailed design documentation will be attached soon. > Refactor retraction rules to support inferring ChangelogMode > > > Key: FLINK-16887 > URL: https://issues.apache.org/jira/browse/FLINK-16887 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Fix For: 1.11.0 > > > Current retraction machanism only support 2 message kinds (+ and -). However, > since FLIP-95, we will introduce more message kinds to users > (insert/delete/update_before/update_after). > In order to support that, we should first refactor current retraction rules > to support ChangelogMode inference. In previous, every node will be attached > with a AccMode trait after retraction rule. In the proposed design, we will > infer ChangelogMode trait for every node. > Design documentation: > https://docs.google.com/document/d/1n_iXIQsKT3uiBqENR8j8RdjRhZfzMhhB66QZvx2rFjE/edit?ts=5e8419c1# -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#discussion_r402739873 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/ExecutorWrapper.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExecutorUtils; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * The help class to get the executor to run asynchronous operations on. It could create a dedicated thread pool or + * reuse an existing one. + */ +public class ExecutorWrapper { + + private ExecutorService internalExecutorService; + private Executor executor; + + private ExecutorWrapper(Configuration flinkConfig) { + internalExecutorService = Executors.newFixedThreadPool( + flinkConfig.getInteger(KubernetesConfigOptions.CLIENT_ASYNC_THREAD_POOL_SIZE), + new ExecutorThreadFactory("FlinkKubeClient-IO")); + } + + private ExecutorWrapper(Executor executor) { Review comment: The reason why i use `ExecutorWrapper` is to provide factory method to create a dedicated thread pool or reuse an existing one. However, i think this could also be done to make the argument of `Fabric8FlinkKubeClient` `Nullable`. I will go in this way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433 ## CI report: * 9e82da175780aa6a6ef19272a41c9a4796621e1a Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158126828) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7011) * 2670a045bd9c9c4c75e7a22bf188fc673c85a062 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#discussion_r402738678 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java ## @@ -183,6 +183,14 @@ .withDescription("The user-specified annotations that are set to the TaskManager pod. The value could be " + "in the form of a1:v1,a2:v2"); + public static final ConfigOption CLIENT_ASYNC_THREAD_POOL_SIZE = Review comment: Yes, this is exactly what i want too. Making `RpcEndpoint#getRpcService#getExecutor` return custom thread pool for I/O related execution is a very good idea. So i will remove this config option now and set the default thread pool size to 2. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16794) ClassNotFoundException caused by ClassLoader.getSystemClassLoader using impertinently
[ https://issues.apache.org/jira/browse/FLINK-16794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074280#comment-17074280 ] victor.jiang commented on FLINK-16794: -- Hi Till Rohrmann, In our application scenario for flink-client usage, we create a customized ClassLoader for loading flink dependent jars, in order to avoid the conflict with our app's jars. For the improvement , pls fix it. > ClassNotFoundException caused by ClassLoader.getSystemClassLoader using > impertinently > --- > > Key: FLINK-16794 > URL: https://issues.apache.org/jira/browse/FLINK-16794 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / REST >Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.8.3 >Reporter: victor.jiang >Priority: Major > > In some containerization environment,the context classloader is not the > SystemClassLoader,it uses the customized classloader usually for the classes > isolation ,so the ClassNotFoundException may be caused。recommends using > getClass/Caller/ThreadCurrentContext 's ClassLoader。 > The related sources below: > 1.flink-clients\src\main\java\org\apache\flink\client\program\ClusterClient.java"(690,33): > return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); > > 2.flink-clients\src\main\java\org\apache\flink\client\program\MiniClusterClient.java"(148,33): > return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); > > 3.flink-runtime\src\main\java\org\apache\flink\runtime\blob\BlobUtils.java"(348,66): > return (Throwable) InstantiationUtil.deserializeObject(bytes, > ClassLoader.getSystemClassLoader()); > > 4.flink-runtime\src\main\java\org\apache\flink\runtime\rest\messages\json\SerializedThrowableDeserializer.java"(52,68): > return InstantiationUtil.deserializeObject(serializedException, > ClassLoader.getSystemClassLoader()); > > 5.flink-runtime\src\main\java\org\apache\flink\runtime\rpc\messages\RemoteRpcInvocation.java"(118,67): > methodInvocation = > serializedMethodInvocation.deserializeValue(ClassLoader.getSystemClassLoader()); -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint
zhijiangW commented on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11534#issuecomment-608230638 Thanks for the review @AHeise @rkhachatryan , and I have addressed all your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433 ## CI report: * 9e82da175780aa6a6ef19272a41c9a4796621e1a Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158126828) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7011) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084 ## CI report: * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015) * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084 ## CI report: * 5ad66720b750455263df2ccb345f46b298d62faa Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/157442617) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6911) * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015) * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner
docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner URL: https://github.com/apache/flink/pull/11276#discussion_r402726288 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala ## @@ -39,34 +36,23 @@ class TableSourceTest extends TableTestBase { @Before def setup(): Unit = { -util.tableEnv.registerTableSource("FilterableTable", TestFilterableTableSource(false)) -TestPartitionableSourceFactory.registerTableSource(util.tableEnv, "PartitionableTable", false) +TestFilterableTableSource.createTemporaryTable( + util.tableEnv, + TestFilterableTableSource.defaultSchema, + "FilterableTable") + +TestPartitionableSourceFactory.createTemporaryTable(util.tableEnv, "PartitionableTable", false) } @Test def testBoundedStreamTableSource(): Unit = { -util.tableEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema)) +TestTableSource.createTemporaryTable(util.tableEnv, isBounded = true, tableSchema, "MyTable") util.verifyPlan("SELECT * FROM MyTable") } @Test def testUnboundedStreamTableSource(): Unit = { -util.tableEnv.registerTableSource("MyTable", new TestTableSource(false, tableSchema)) -util.verifyPlan("SELECT * FROM MyTable") - } - - @Test - def testNonStreamTableSource(): Unit = { -val tableSource = new TableSource[Row]() { - - override def getProducedDataType: DataType = tableSchema.toRowDataType - - override def getTableSchema: TableSchema = tableSchema -} -util.tableEnv.registerTableSource("MyTable", tableSource) -thrown.expect(classOf[ValidationException]) -thrown.expectMessage( - "Only StreamTableSource and LookupableTableSource can be used in Blink planner.") Review comment: It's only used in `convertSourceTable` which convert `ConnectorCatalogTable` to `TableSourceTable`. When there is no registerTableSource/Sink, there is no `ConnectorCatalogTable` either. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner
docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner URL: https://github.com/apache/flink/pull/11276#discussion_r402725215 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala ## @@ -112,29 +78,28 @@ object MemoryTableSourceSinkUtil { } } - final class UnsafeMemoryOutputFormatTableSink extends OutputFormatTableSink[Row] { - -var fieldNames: Array[String] = _ -var fieldTypes: Array[TypeInformation[_]] = _ - -override def getOutputType: TypeInformation[Row] = { - new RowTypeInfo(getTableSchema.getFieldTypes, getTableSchema.getFieldNames) + final class LegacyUnsafeMemoryAppendTableFactory extends StreamTableSinkFactory[Row] { +override def createStreamTableSink( +properties: util.Map[String, String]): StreamTableSink[Row] = { + val dp = new DescriptorProperties + dp.putProperties(properties) + val tableSchema = dp.getTableSchema(SCHEMA) + val sink = new UnsafeMemoryAppendTableSink + sink.configure(tableSchema.getFieldNames, tableSchema.getFieldTypes) +.asInstanceOf[StreamTableSink[Row]] } -override def getOutputFormat: OutputFormat[Row] = new MemoryCollectionOutputFormat - -override def configure( -fieldNames: Array[String], -fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { - val newSink = new UnsafeMemoryOutputFormatTableSink - newSink.fieldNames = fieldNames - newSink.fieldTypes = fieldTypes - newSink +override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String]() + context.put(CONNECTOR_TYPE, "LegacyUnsafeMemoryAppendTable") Review comment: It's not necessary to be a constant field, it's only used in this test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong merged pull request #11357: [FLINK-16476] [table-planner-blink] Remove PowerMockito to avoid LinkageError in SelectivityEstimatorTest
wuchong merged pull request #11357: [FLINK-16476] [table-planner-blink] Remove PowerMockito to avoid LinkageError in SelectivityEstimatorTest URL: https://github.com/apache/flink/pull/11357 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-16476) SelectivityEstimatorTest logs LinkageErrors
[ https://issues.apache.org/jira/browse/FLINK-16476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu resolved FLINK-16476. - Resolution: Fixed Fixed in mater (1.11.0) with 00526eba0f37f8869e62f41f43a40906e4169790 and b93c75b31e8ce17f57c8311f591a70316180df04 > SelectivityEstimatorTest logs LinkageErrors > --- > > Key: FLINK-16476 > URL: https://issues.apache.org/jira/browse/FLINK-16476 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Assignee: godfrey he >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This is the test run > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6038&view=logs&j=d47ab8d2-10c7-5d9e-8178-ef06a797a0d8&t=9a1abf5f-7cf4-58c3-bb2a-282a64aebb1f > Log output > {code} > 2020-03-07T00:35:20.1270791Z [INFO] Running > org.apache.flink.table.planner.plan.metadata.SelectivityEstimatorTest > 2020-03-07T00:35:21.6473057Z [INFO] Tests run: 3, Failures: 0, Errors: 0, > Skipped: 0, Time elapsed: 3.408 s - in > org.apache.flink.table.planner.plan.utils.FlinkRexUtilTest > 2020-03-07T00:35:21.6541713Z [INFO] Running > org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCostTest > 2020-03-07T00:35:21.7294613Z [INFO] Tests run: 2, Failures: 0, Errors: 0, > Skipped: 0, Time elapsed: 0.073 s - in > org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCostTest > 2020-03-07T00:35:21.7309958Z [INFO] Running > org.apache.flink.table.planner.plan.metadata.AggCallSelectivityEstimatorTest > 2020-03-07T00:35:23.7443246Z ScriptEngineManager providers.next(): > javax.script.ScriptEngineFactory: Provider > jdk.nashorn.api.scripting.NashornScriptEngineFactory not a subtype > 2020-03-07T00:35:23.8260013Z 2020-03-07 00:35:23,819 main ERROR Could not > reconfigure JMX java.lang.LinkageError: loader constraint violation: loader > (instance of > org/powermock/core/classloader/javassist/JavassistMockClassLoader) previously > initiated loading for a different type with name > "javax/management/MBeanServer" > 2020-03-07T00:35:23.8262329Z at java.lang.ClassLoader.defineClass1(Native > Method) > 2020-03-07T00:35:23.8263241Z at > java.lang.ClassLoader.defineClass(ClassLoader.java:757) > 2020-03-07T00:35:23.8264629Z at > org.powermock.core.classloader.javassist.JavassistMockClassLoader.loadUnmockedClass(JavassistMockClassLoader.java:90) > 2020-03-07T00:35:23.8266241Z at > org.powermock.core.classloader.MockClassLoader.loadClassByThisClassLoader(MockClassLoader.java:104) > 2020-03-07T00:35:23.8267808Z at > org.powermock.core.classloader.DeferSupportingClassLoader.loadClass1(DeferSupportingClassLoader.java:147) > 2020-03-07T00:35:23.8269485Z at > org.powermock.core.classloader.DeferSupportingClassLoader.loadClass(DeferSupportingClassLoader.java:98) > 2020-03-07T00:35:23.8270900Z at > java.lang.ClassLoader.loadClass(ClassLoader.java:352) > 2020-03-07T00:35:23.8272000Z at > org.apache.logging.log4j.core.jmx.Server.unregisterAllMatching(Server.java:337) > 2020-03-07T00:35:23.8273779Z at > org.apache.logging.log4j.core.jmx.Server.unregisterLoggerContext(Server.java:261) > 2020-03-07T00:35:23.8275087Z at > org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:165) > 2020-03-07T00:35:23.8276515Z at > org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:141) > 2020-03-07T00:35:23.8278036Z at > org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:590) > 2020-03-07T00:35:23.8279741Z at > org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) > 2020-03-07T00:35:23.8281190Z at > org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) > 2020-03-07T00:35:23.8282440Z at > org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) > 2020-03-07T00:35:23.8283717Z at > org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) > 2020-03-07T00:35:23.8285186Z at > org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) > 2020-03-07T00:35:23.8286575Z at > org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) > 2020-03-07T00:35:23.8287933Z at > org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) > 2020-03-07T00:35:23.8289393Z at > org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) > 2020-03-07T00:35:23.8290816Z at > org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) > 2020-03-07T00:35:23.8292179Z at
[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner
docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner URL: https://github.com/apache/flink/pull/11276#discussion_r402722415 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala ## @@ -26,54 +26,19 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{DataTypes, TableException, TableSchema, Tumble, Types} import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5} import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink} -import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.DataTypeAppendStreamTableSink import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil} import org.apache.flink.table.sinks._ import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils} import org.apache.flink.types.Row - import org.junit.Assert._ import org.junit.Test - import java.io.File import java.util.TimeZone import scala.collection.JavaConverters._ class TableSinkITCase extends AbstractTestBase { - @Test - def testInsertIntoRegisteredTableSink(): Unit = { Review comment: There is no registered table sink any more. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084 ## CI report: * 5ad66720b750455263df2ccb345f46b298d62faa Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/157442617) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6911) * b709e1952df66cba5a316f9a46902538bf8cf245 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813 ## CI report: * d20feb41f2e2940f6d6c8766586876addd3e2e92 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157491477) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6932) * d8814c3aed36cea34b5069b2a270df8033d29956 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158126859) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7012) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433 ## CI report: * d0d0b6fef21a118932e878255aa40f10f17fe753 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/155574041) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6686) * 9e82da175780aa6a6ef19272a41c9a4796621e1a Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158126828) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7011) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516 ## CI report: * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable
flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable URL: https://github.com/apache/flink/pull/11619#issuecomment-607793023 ## CI report: * 68c82eef3469c403b1a0f8a64c13f04d989bf145 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/157750104) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6989) * 6bd37e40b9333d646bbf1bb1fbe286ad21d1e709 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158126927) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7013) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner
docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner URL: https://github.com/apache/flink/pull/11276#discussion_r402721935 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala ## @@ -54,8 +55,67 @@ import scala.collection.JavaConverters._ import scala.collection.mutable object TestTableSources { + def createPersonCsvTemporaryTable(tEnv: TableEnvironment, tableName: String): Unit = { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner
docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner URL: https://github.com/apache/flink/pull/11276#discussion_r402721904 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala ## @@ -73,121 +79,72 @@ class InMemoryLookupableTableSource( map.toMap } + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +null + } + override def isAsyncEnabled: Boolean = asyncEnabled - override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + override def getProducedDataType: DataType = schema.toRowDataType + + override def getTableSchema: TableSchema = schema - override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes) + + override def isBounded: Boolean = bounded @VisibleForTesting def getResourceCounter: Int = resourceCounter.get() - } -object InMemoryLookupableTableSource { - - /** -* Return a new builder that builds a [[InMemoryLookupableTableSource]]. -* -* For example: -* -* {{{ -* val data = ( -* (11, 1L, "Julian"), -* (22, 2L, "Jark"), -* (33, 3L, "Fabian")) -* -* val source = InMemoryLookupableTableSource.builder() -* .data(data) -* .field("age", Types.INT) -* .field("id", Types.LONG) -* .field("name", Types.STRING) -* .enableAsync() -* .build() -* }}} -* -* @return a new builder to build a [[InMemoryLookupableTableSource]] -*/ - def builder(): Builder = new Builder +class InMemoryLookupableTableFactory extends TableSourceFactory[Row] { + override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = { +val dp = new DescriptorProperties +dp.putProperties(properties) +val tableSchema = dp.getTableSchema(SCHEMA) +val serializedData = dp.getString("data") +val data = EncodingUtils.decodeStringToObject(serializedData, classOf[List[Product]]) - /** -* A builder for creating [[InMemoryLookupableTableSource]] instances. -* -* For example: -* -* {{{ -* val data = ( -* (11, 1L, "Julian"), -* (22, 2L, "Jark"), -* (33, 3L, "Fabian")) -* -* val source = InMemoryLookupableTableSource.builder() -* .data(data) -* .field("age", Types.INT) -* .field("id", Types.LONG) -* .field("name", Types.STRING) -* .enableAsync() -* .build() -* }}} -*/ - class Builder { -private val schema = new mutable.LinkedHashMap[String, TypeInformation[_]]() -private var data: List[Product] = _ -private var asyncEnabled: Boolean = false - -/** - * Sets table data for the table source. - */ -def data(data: List[Product]): Builder = { - this.data = data - this +val rowData = data.map { entry => + Row.of((0 until entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*) } -/** - * Adds a field with the field name and the type information. Required. - * This method can be called multiple times. The call order of this method defines - * also the order of the fields in a row. - * - * @param fieldName the field name - * @param fieldType the type information of the field - */ -def field(fieldName: String, fieldType: TypeInformation[_]): Builder = { - if (schema.contains(fieldName)) { -throw new IllegalArgumentException(s"Duplicate field name $fieldName.") - } - schema += (fieldName -> fieldType) - this -} +val asyncEnabled = dp.getOptionalBoolean("is-async").orElse(false) -/** - * Enables async lookup for the table source - */ -def enableAsync(): Builder = { - asyncEnabled = true - this -} +val bounded = dp.getOptionalBoolean("is-bounded").orElse(false) -/** - * Apply the current values and constructs a newly-created [[InMemoryLookupableTableSource]]. - * - * @return a newly-created [[InMemoryLookupableTableSource]]. - */ -def build(): InMemoryLookupableTableSource = { - val fieldNames = schema.keys.toArray - val fieldTypes = schema.values.toArray - Preconditions.checkNotNull(data) - // convert - val rowData = data.map { entry => -Row.of((0 until entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*) - } - new InMemoryLookupableTableSource( -fieldNames, -fieldTypes, -rowData, -asyncEnabled - ) -} +new InMemoryLookupableTableSource(tableSchema, rowData, asyncEnabled, bounded) + } + + override def requiredContext(): util.Map[String, String] = { +val co
[GitHub] [flink] wenlong88 commented on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
wenlong88 commented on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-608213838 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402721375 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -351,6 +352,94 @@ private void testReleaseMemory(final ResultPartitionType resultPartitionType) th } } + /** +* Tests {@link ResultPartition#getAvailableFuture()}. +*/ + @Test + public void testIsAvailableOrNot() throws IOException, InterruptedException { + final int numAllBuffers = 10; + final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNumNetworkBuffers(numAllBuffers).build(); + final ResultPartition resultPartition = createPartition(network, ResultPartitionType.PIPELINED, 1); + + try { + resultPartition.setup(); + + resultPartition.getBufferPool().setNumBuffers(2); + + assertTrue(resultPartition.getAvailableFuture().isDone()); + + resultPartition.getBufferBuilder(); + resultPartition.getBufferBuilder(); + assertFalse(resultPartition.getAvailableFuture().isDone()); + } finally { + resultPartition.release(); + network.close(); + } + } + + /** +* Tests {@link ResultPartition#getAvailableFuture()} with configured max backlogs. +*/ + @Test + public void testMaxBuffersPerChannelAndAvailability() throws IOException, InterruptedException { + final int numAllBuffers = 10; + final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNumNetworkBuffers(numAllBuffers).build(); + final ResultPartition resultPartition = new ResultPartitionBuilder() + .setResultPartitionManager(network.getResultPartitionManager()) + .setupBufferPoolFactoryFromNettyShuffleEnvironment(network) + .setResultPartitionType(ResultPartitionType.PIPELINED) + .setNumberOfSubpartitions(2) + .setMaxBuffersPerChannel(1) + .build(); + + try { + resultPartition.setup(); + + assertTrue(resultPartition.getAvailableFuture().isDone()); + + BufferAvailabilityListener listener0 = mock(BufferAvailabilityListener.class); Review comment: it's better to avoid the use of mock This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402720894 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java ## @@ -74,6 +74,8 @@ private final String compressionCodec; + private final int maxBuffersPerChannel; Review comment: it's better to update the ```hashCode```, ```equals``` and ```toString``` method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp
[ https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074255#comment-17074255 ] YufeiLiu commented on FLINK-16938: -- It wasn't I want to use SqlTimestamp, because of blink-planner intenal conversion, it cast input type to intenal type which is SqlTimestamp, and cast to output type in result. It's out of my control. > SqlTimestamp has lag when convert long to Timestamp > --- > > Key: FLINK-16938 > URL: https://issues.apache.org/jira/browse/FLINK-16938 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > > When I set rowtime attribute by using expression 'column.rowtime' , and > result type is sql.Timestamp, the result will have lag which is equals with > default timezone offset. > {code:java} > tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data"); > {code} > I look into the conversion logic, the field was go through 'long -> > SqlTimestamp -> Timestamp' conversion. > {code:java} > long from = System.currentTimeMillis(); > long to = SqlTimestamp > .fromEpochMillis(from) > .toTimestamp() > .getTime(); > {code} > The result is {{from!=to}}. In {{SqlTimestamp.toTimestamp()}} using > {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone > infomation, will casue time lag. > From Timestamp to Timestamp not have this issue, but convert Datastream to > Table is use StreamRecord.timestamp as rowtime field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402719676 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -375,4 +397,32 @@ void onConsumedSubpartition(int subpartitionIndex) { private void checkInProduceState() throws IllegalStateException { checkState(!isFinished, "Partition already finished."); } + + /** +* Check whether all subpartitions' backlogs are less than the limitation of max backlogs, and make this partition +* available again if yes. +*/ + public void notifyDecreaseBacklog(int buffersInBacklog) { + if (buffersInBacklog == maxBuffersPerChannel) { + unavailableSubpartitionsCount--; + if (unavailableSubpartitionsCount == 0) { + CompletableFuture toNotify = availabilityHelper.getUnavailableToResetAvailable(); + toNotify.complete(null); + int[] a = new int[1024]; + } + } + } + + /** +* Check whether any subpartition's backlog exceeds the limitation of max backlogs, and make this partition +* unavailabe if yes. +*/ + public void notifyIncreaseBacklog(int buffersInBacklog) { + if (buffersInBacklog == maxBuffersPerChannel + 1) { + unavailableSubpartitionsCount++; + if (unavailableSubpartitionsCount == 1) { Review comment: the above two lines can be replaced with: ```if (++unavailableSubpartitionsCount == 1) {``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402718999 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -375,4 +397,32 @@ void onConsumedSubpartition(int subpartitionIndex) { private void checkInProduceState() throws IllegalStateException { checkState(!isFinished, "Partition already finished."); } + + /** +* Check whether all subpartitions' backlogs are less than the limitation of max backlogs, and make this partition +* available again if yes. +*/ + public void notifyDecreaseBacklog(int buffersInBacklog) { + if (buffersInBacklog == maxBuffersPerChannel) { + unavailableSubpartitionsCount--; + if (unavailableSubpartitionsCount == 0) { + CompletableFuture toNotify = availabilityHelper.getUnavailableToResetAvailable(); + toNotify.complete(null); + int[] a = new int[1024]; Review comment: why we need this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402719144 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -375,4 +397,32 @@ void onConsumedSubpartition(int subpartitionIndex) { private void checkInProduceState() throws IllegalStateException { checkState(!isFinished, "Partition already finished."); } + + /** +* Check whether all subpartitions' backlogs are less than the limitation of max backlogs, and make this partition +* available again if yes. +*/ + public void notifyDecreaseBacklog(int buffersInBacklog) { + if (buffersInBacklog == maxBuffersPerChannel) { + unavailableSubpartitionsCount--; + if (unavailableSubpartitionsCount == 0) { Review comment: the above two lines can be replaced with: ```if (--unavailableSubpartitionsCount == 0) {``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-16885) SQL hive-connector wilcard excludes don't work on maven 3.1.X
[ https://issues.apache.org/jira/browse/FLINK-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee resolved FLINK-16885. -- Resolution: Fixed master: 2b94ca60ca437df98d0efeb7b9a43ecdf216825b > SQL hive-connector wilcard excludes don't work on maven 3.1.X > - > > Key: FLINK-16885 > URL: https://issues.apache.org/jira/browse/FLINK-16885 > Project: Flink > Issue Type: Bug > Components: Build System, Connectors / Hive >Affects Versions: 1.11.0 >Reporter: Chesnay Schepler >Assignee: Jingsong Lee >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The sql-connector-hive modules added in FLINK-16455 use wildcards imports to > exclude all transitive dependencies from hive. > This is a maven 3.2.1+ feature. This may imply that Flink cannot be properly > built anymore with maven 3.1 . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi merged pull request #11620: [FLINK-16885][hive] Remove wilcard excludes that don't work on maven 3.1.X
JingsongLi merged pull request #11620: [FLINK-16885][hive] Remove wilcard excludes that don't work on maven 3.1.X URL: https://github.com/apache/flink/pull/11620 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402718422 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -342,6 +342,7 @@ private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) { assert Thread.holdsLock(buffers); if (isBuffer) { buffersInBacklog--; + parent.notifyDecreaseBacklog(buffersInBacklog); Review comment: the above two lines can be replaced with: ```parent.notifyDecreaseBacklog(--buffersInBacklog);``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516 ## CI report: * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable
flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable URL: https://github.com/apache/flink/pull/11619#issuecomment-607793023 ## CI report: * 68c82eef3469c403b1a0f8a64c13f04d989bf145 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/157750104) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6989) * 6bd37e40b9333d646bbf1bb1fbe286ad21d1e709 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #11620: [FLINK-16885][hive] Remove wilcard excludes that don't work on maven 3.1.X
JingsongLi commented on issue #11620: [FLINK-16885][hive] Remove wilcard excludes that don't work on maven 3.1.X URL: https://github.com/apache/flink/pull/11620#issuecomment-608210790 Merged #11620 to master. Thanks @lirui-apache for the review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions URL: https://github.com/apache/flink/pull/11567#discussion_r402718528 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -355,6 +356,7 @@ private void increaseBuffersInBacklog(BufferConsumer buffer) { if (buffer != null && buffer.isBuffer()) { buffersInBacklog++; + parent.notifyIncreaseBacklog(buffersInBacklog); Review comment: the above two lines can be replaced with: ```parent.notifyDecreaseBacklog(++buffersInBacklog);``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433 ## CI report: * d0d0b6fef21a118932e878255aa40f10f17fe753 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/155574041) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6686) * 9e82da175780aa6a6ef19272a41c9a4796621e1a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813 ## CI report: * d20feb41f2e2940f6d6c8766586876addd3e2e92 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157491477) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6932) * d8814c3aed36cea34b5069b2a270df8033d29956 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16949) Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
[ https://issues.apache.org/jira/browse/FLINK-16949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074248#comment-17074248 ] Jark Wu commented on FLINK-16949: - Thanks [~yunta], this approach sounds good to me. > Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider > --- > > Key: FLINK-16949 > URL: https://issues.apache.org/jira/browse/FLINK-16949 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Enhance specific UTs to use customized TtlTimeProvider to simulate changed > current time. This would introduce some changes to > {{AbstractStreamOperatorTestHarness}} and add new > {{StreamTaskStateInitializerTestImpl}} for different state backends. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp
[ https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074243#comment-17074243 ] Jark Wu commented on FLINK-16938: - Regarding to the time attribute, yes, currently only TIMESTAMP(3) is supported. TIMESTAMP(3) WITH LOCAL TIME ZONE is in the future plan. > SqlTimestamp has lag when convert long to Timestamp > --- > > Key: FLINK-16938 > URL: https://issues.apache.org/jira/browse/FLINK-16938 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > > When I set rowtime attribute by using expression 'column.rowtime' , and > result type is sql.Timestamp, the result will have lag which is equals with > default timezone offset. > {code:java} > tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data"); > {code} > I look into the conversion logic, the field was go through 'long -> > SqlTimestamp -> Timestamp' conversion. > {code:java} > long from = System.currentTimeMillis(); > long to = SqlTimestamp > .fromEpochMillis(from) > .toTimestamp() > .getTime(); > {code} > The result is {{from!=to}}. In {{SqlTimestamp.toTimestamp()}} using > {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone > infomation, will casue time lag. > From Timestamp to Timestamp not have this issue, but convert Datastream to > Table is use StreamRecord.timestamp as rowtime field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on a change in pull request #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable
wangyang0918 commented on a change in pull request #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable URL: https://github.com/apache/flink/pull/11619#discussion_r402715315 ## File path: flink-end-to-end-tests/test-scripts/common_kubernetes.sh ## @@ -88,22 +86,43 @@ function start_kubernetes_if_not_running { } function start_kubernetes { -[[ "${OS_TYPE}" = "linux" ]] && setup_kubernetes_for_linux -if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} start_kubernetes_if_not_running; then -echo "Could not start minikube. Aborting..." -exit 1 +if [[ "${OS_TYPE}" != "linux" ]]; then +if ! check_kubernetes_status; then +echo "$NON_LINUX_ENV_NOTE" +exit 1 +fi +else +setup_kubernetes_for_linux +if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} start_kubernetes_if_not_running; then +echo "Could not start minikube. Aborting..." +exit 1 +fi fi eval $(minikube docker-env) } function stop_kubernetes { -echo "Stopping minikube ..." -stop_command="minikube stop" -[[ "${OS_TYPE}" = "linux" ]] && stop_command="sudo ${stop_command}" -if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} "${stop_command}"; then -echo "Could not stop minikube. Aborting..." -exit 1 +if [[ "${OS_TYPE}" != "linux" ]]; then +echo "$NON_LINUX_ENV_NOTE" +else +echo "Stopping minikube ..." +stop_command="sudo minikube stop" +if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} "${stop_command}"; then +echo "Could not stop minikube. Aborting..." +exit 1 +fi fi } +function debug_copy_and_show_logs { +echo "Debugging failed Kubernetes test:" +echo "Currently exiting Kubernetes resources" Review comment: Nice catch. I will fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-608203175 @azagrebin Thanks for the review. Comments addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions URL: https://github.com/apache/flink/pull/11568#discussion_r402701406 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/TestRelDataTypeFactory.java ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.flink.table.calcite.RawRelDataTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; + +/** + * {@link RelDataTypeFactory} for testing purposes. + */ +public final class TestRelDataTypeFactory extends SqlTypeFactoryImpl implements RawRelDataTypeFactory { + + TestRelDataTypeFactory(RelDataTypeSystem typeSystem) { Review comment: -1 for the idea of implements a `RawRelDataTypeFactory` interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions URL: https://github.com/apache/flink/pull/11568#discussion_r402701101 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java ## @@ -28,7 +27,18 @@ * Type used during tests. */ public class Fixture { - private final RelDataTypeFactory typeFactory; + private final TestRelDataTypeFactory typeFactory; + + static final String RAW_TYPE_INT_CLASS = "java.lang.Integer"; + static final String RAW_TYPE_INT_SERIALIZER_STRING = Review comment: Agree with @dawidwys, the current string would have problem for different sql text encoding. i.e. from `UTF-8` to `UTF-16`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions URL: https://github.com/apache/flink/pull/11568#discussion_r402703105 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala ## @@ -484,6 +493,9 @@ object FlinkTypeFactory { // CURSOR for UDTF case, whose type info will never be used, just a placeholder case CURSOR => new TypeInformationRawType[Nothing](new NothingTypeInfo) + case null if relDataType.isInstanceOf[RawRelDataType] => Review comment: I would suggest to use `SqlTypeName.ANY` which indicates the type comes from a Java object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions URL: https://github.com/apache/flink/pull/11568#discussion_r402698067 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/table/calcite/RawRelDataTypeFactory.java ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite; + +import org.apache.calcite.rel.type.RelDataType; + +/** + * A factory for creating a {@link RelDataType} that describes a RAW type such as + * {@code RAW('org.my.Class', 'sW3Djsds...')}. + */ +public interface RawRelDataTypeFactory { + + RelDataType createRawType(String className, String serializerString); Review comment: Can we reuse the `FlinkRelDataTypeFactory`, we didn't make factory for specific rel type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions URL: https://github.com/apache/flink/pull/11568#discussion_r402692226 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -889,6 +888,30 @@ SqlTypeNameSpec SqlMapTypeName() : } } +/** Parses a SQL raw type such as {@code RAW('org.my.Class', 'sW3Djsds...')}. */ +SqlTypeNameSpec SqlRawTypeName() : +{ +SqlCharStringLiteral className; +SqlCharStringLiteral serializerString; +} +{ + + + { +String cn = SqlParserUtil.parseString(token.image); +className = SqlLiteral.createCharString(cn, getPos()); Review comment: I would suggest to reuse `StringLiteral()` parse instead for these reasons: 1. Handle the multi-segment/charset/unescape character correctly 2. We would check the string format sooner or later, either in the parser and planner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516 ## CI report: * 95f0deb44dfae1be800260732e47bf8ceaf9ef67 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/157722695) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6976) * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16951) Integrate parquet to file system connector
Jingsong Lee created FLINK-16951: Summary: Integrate parquet to file system connector Key: FLINK-16951 URL: https://issues.apache.org/jira/browse/FLINK-16951 Project: Flink Issue Type: Sub-task Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.0 Reporter: Jingsong Lee Assignee: Jingsong Lee Implement ParquetFileSystemFormatFactory -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp
[ https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074220#comment-17074220 ] Jark Wu edited comment on FLINK-16938 at 4/3/20, 2:34 AM: -- Hi [~liuyufei], I'm interested why you want to use {{SqlTimestamp}} ? {{SqlTimestamp}} is an internal data structure to represent TIMESTAMP type, which is equal to Java's LocalDateTime. The converntion from LocalDateTime to Timestamp is delegated to JDK's implementation {{Timestamp.valueOf(LocalDateTime)}} which keeps the equal string representation, not the underlying millisecond. cc [~docete] was (Author: jark): Hi [~liuyufei], I'm interested why you want to use {{SqlTimestamp}} ? {{SqlTimestamp}} is an internal data structure to represent TIMESTAMP type, which is equal to Java's LocalDateTime. The converntion from LocalDateTime to Timestamp as delegate to JDK's implementation {{Timestamp.valueOf(LocalDateTime)}} which keeps the equal string representation, not the underlying millisecond. cc [~docete] > SqlTimestamp has lag when convert long to Timestamp > --- > > Key: FLINK-16938 > URL: https://issues.apache.org/jira/browse/FLINK-16938 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: YufeiLiu >Priority: Major > > When I set rowtime attribute by using expression 'column.rowtime' , and > result type is sql.Timestamp, the result will have lag which is equals with > default timezone offset. > {code:java} > tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data"); > {code} > I look into the conversion logic, the field was go through 'long -> > SqlTimestamp -> Timestamp' conversion. > {code:java} > long from = System.currentTimeMillis(); > long to = SqlTimestamp > .fromEpochMillis(from) > .toTimestamp() > .getTime(); > {code} > The result is {{from!=to}}. In {{SqlTimestamp.toTimestamp()}} using > {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone > infomation, will casue time lag. > From Timestamp to Timestamp not have this issue, but convert Datastream to > Table is use StreamRecord.timestamp as rowtime field. -- This message was sent by Atlassian Jira (v8.3.4#803005)