[jira] [Commented] (FLINK-11499) Extend StreamingFileSink BulkFormats to support arbitrary roll policies

2020-04-02 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074322#comment-17074322
 ] 

Piotr Nowojski commented on FLINK-11499:


{quote}
Have you consider using checkpoint stream? I think the checkpoint state backend 
is the closest storage for job.
{quote}
[~lzljs3620320]: Not directly, but I'm hoping the solution be general enough, 
that one could pass the same target FileSystem for the WAL stream as for 
checkpointing. Wouldn't that achieve the same result but on a lower level 
(writing to a file directly vs to state)?
{quote}
Some writers which come with buffer size/capacity criteria may get full even 
before the checkpoint is triggered. We have to think about this scenario as 
well, right? 
{quote}
[~zenfenan]: maybe you are right. I was already thinking about it as I suspect 
it's even currently supported that bulk format file will roll in the middle of 
checkpoint? If that's the case, it's a matter of keeping this feature and it 
shouldn't be that difficult, as we should already have the code to support it.

> Extend StreamingFileSink BulkFormats to support arbitrary roll policies
> ---
>
> Key: FLINK-11499
> URL: https://issues.apache.org/jira/browse/FLINK-11499
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Seth Wiesman
>Priority: Major
>  Labels: usability
> Fix For: 1.11.0
>
>
> Currently when using the StreamingFilleSink Bulk-encoding formats can only be 
> combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress 
> part file on every checkpoint.
> However, many bulk formats such as parquet are most efficient when written as 
> large files; this is not possible when frequent checkpointing is enabled. 
> Currently the only work-around is to have long checkpoint intervals which is 
> not ideal.
>  
> The StreamingFileSink should be enhanced to support arbitrary roll policy's 
> so users may write large bulk files while retaining frequent checkpoints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16945) Execute CheckpointFailureManager.FailJobCallback directly in main thread executor

2020-04-02 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-16945:
--

Assignee: Biao Liu

> Execute CheckpointFailureManager.FailJobCallback directly in main thread 
> executor
> -
>
> Key: FLINK-16945
> URL: https://issues.apache.org/jira/browse/FLINK-16945
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Critical
> Fix For: 1.11.0
>
>
> Since we have put all non-IO operations of {{CheckpointCoordinator}} into 
> main thread executor, the {{CheckpointFailureManager.FailJobCallback}} could 
> be executed directly now. In this way execution graph would fail immediately 
> when {{CheckpointFailureManager}} invokes the callback. We could avoid the 
> inconsistent scenario of FLINK-13497.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] Introduce file system table factory

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] 
Introduce file system table factory
URL: https://github.com/apache/flink/pull/11574#issuecomment-606453374
 
 
   
   ## CI report:
   
   * 415f37b7a4c03d5591c9352f593eaecb23bc1cf3 UNKNOWN
   * ad9d0b168f308a5205430b2dcf85e680f20449b3 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157813487) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6994)
 
   * 756a68346b01c34e41fcb0b8d122e1a94207491e Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158140230) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7018)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce 
table row write support for parquet writer
URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516
 
 
   
   ## CI report:
   
   * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010)
 
   * 39d9b1a9c461fc940a660b95a4764f9982ba1599 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16954) 1.9 branch was not compile

2020-04-02 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074317#comment-17074317
 ] 

Chesnay Schepler commented on FLINK-16954:
--

ping [~aljoscha] [~igal]

> 1.9 branch was not compile
> --
>
> Key: FLINK-16954
> URL: https://issues.apache.org/jira/browse/FLINK-16954
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.3
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.3
>
>
> Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3.
> {code:java}
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassReader
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassWriter
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
>  cannot find symbol
> 06:35:03.334 [ERROR] symbol:   class ClassReader
> 06:35:03.334 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16954) 1.9 branch was not compile

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-16954:
-
Description: 
Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3.

{code:java}
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
 package jdk.internal.org.objectweb.asm does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
 package jdk.internal.org.objectweb.asm does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
 package jdk.internal.org.objectweb.asm.commons does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
 package jdk.internal.org.objectweb.asm.commons does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
 cannot find symbol
06:35:03.332 [ERROR] symbol:   class ClassReader
06:35:03.332 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
 cannot find symbol
06:35:03.332 [ERROR] symbol:   class ClassWriter
06:35:03.332 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
 cannot find symbol
06:35:03.334 [ERROR] symbol:   class ClassReader
06:35:03.334 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
{code}

  was:
Usage of JDK 11 APIs.

{code:java}
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
 package jdk.internal.org.objectweb.asm does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
 package jdk.internal.org.objectweb.asm does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
 package jdk.internal.org.objectweb.asm.commons does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
 package jdk.internal.org.objectweb.asm.commons does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
 cannot find symbol
06:35:03.332 [ERROR] symbol:   class ClassReader
06:35:03.332 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
 cannot find symbol
06:35:03.332 [ERROR] symbol:   class ClassWriter
06:35:03.332 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
 cannot find symbol
06:35:03.334 [ERROR] symbol:   class ClassReader
06:35:03.334 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
{code}


> 1.9 branch was not compile
> --
>
> Key: FLINK-16954
> URL: https://issues.apache.org/jira/browse/FLINK-16954
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.3
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.3
>
>
> Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3.
> {code:java}
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/tra

[jira] [Updated] (FLINK-16954) 1.9 branch does not compile

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-16954:
-
Summary: 1.9 branch does not compile  (was: 1.9 branch was not compile)

> 1.9 branch does not compile
> ---
>
> Key: FLINK-16954
> URL: https://issues.apache.org/jira/browse/FLINK-16954
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.3
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.3
>
>
> Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3.
> {code:java}
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassReader
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassWriter
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
>  cannot find symbol
> 06:35:03.334 [ERROR] symbol:   class ClassReader
> 06:35:03.334 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16954) ClassRelocator uses JDK 11 APIs

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-16954:
-
Summary: ClassRelocator uses JDK 11 APIs  (was: 1.9 branch does not compile)

> ClassRelocator uses JDK 11 APIs
> ---
>
> Key: FLINK-16954
> URL: https://issues.apache.org/jira/browse/FLINK-16954
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.3
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.3
>
>
> Usage of JDK 11 APIs in 0bc3d69e036ede36d11daa65bf6541e5c3a209c3.
> {code:java}
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassReader
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassWriter
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
>  cannot find symbol
> 06:35:03.334 [ERROR] symbol:   class ClassReader
> 06:35:03.334 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16954) 1.9 branch was not compile

2020-04-02 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-16954:


 Summary: 1.9 branch was not compile
 Key: FLINK-16954
 URL: https://issues.apache.org/jira/browse/FLINK-16954
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Affects Versions: 1.9.3
Reporter: Chesnay Schepler
 Fix For: 1.9.3


Usage of JDK 11 APIs.

{code:java}
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
 package jdk.internal.org.objectweb.asm does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
 package jdk.internal.org.objectweb.asm does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
 package jdk.internal.org.objectweb.asm.commons does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
 package jdk.internal.org.objectweb.asm.commons does not exist
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
 cannot find symbol
06:35:03.332 [ERROR] symbol:   class ClassReader
06:35:03.332 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
 cannot find symbol
06:35:03.332 [ERROR] symbol:   class ClassWriter
06:35:03.332 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
06:35:03.332 [ERROR] 
/home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
 cannot find symbol
06:35:03.334 [ERROR] symbol:   class ClassReader
06:35:03.334 [ERROR] location: class 
org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16954) 1.9 branch was not compile

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-16954:
-
Issue Type: Bug  (was: Improvement)

> 1.9 branch was not compile
> --
>
> Key: FLINK-16954
> URL: https://issues.apache.org/jira/browse/FLINK-16954
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.3
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.3
>
>
> Usage of JDK 11 APIs.
> {code:java}
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[21,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[22,38]
>  package jdk.internal.org.objectweb.asm does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[23,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[24,46]
>  package jdk.internal.org.objectweb.asm.commons does not exist
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,81]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassReader
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[158,32]
>  cannot find symbol
> 06:35:03.332 [ERROR] symbol:   class ClassWriter
> 06:35:03.332 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> 06:35:03.332 [ERROR] 
> /home/travis/build/flink-ci/flink-mirror/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ClassRelocator.java:[165,32]
>  cannot find symbol
> 06:35:03.334 [ERROR] symbol:   class ClassReader
> 06:35:03.334 [ERROR] location: class 
> org.apache.flink.api.common.typeutils.ClassRelocator.ClassRemapper
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol commented on a change in pull request #11583: [FLINK-16883] Scan FLINK_CONF directory for available log4j configura…

2020-04-02 Thread GitBox
zentol commented on a change in pull request #11583: [FLINK-16883] Scan 
FLINK_CONF directory for available log4j configura…
URL: https://github.com/apache/flink/pull/11583#discussion_r402767124
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/flink-console.sh
 ##
 @@ -58,7 +58,9 @@ esac
 
 FLINK_TM_CLASSPATH=`constructFlinkClassPath`
 
-log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"
 "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" 
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+log4j_config=`findLog4jConfiguration ${FLINK_CONF_DIR}`
+
+log_setting=("-Dlog4j.configuration=file:${log4j_config}" 
"-Dlog4j.configurationFile=file:${log4j_config}" 
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
 
 Review comment:
   The goal was to make the scripts less strict when it comes to the file type 
(.properties vs .xml). This is more of a concern for log4j since we use a 
properties file, whereas xml appears to be the "preferred" way with log4j2.
   I haven't heard similar problems existing with logback, but yes, ultimately 
it could be done for both frameworks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp

2020-04-02 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074311#comment-17074311
 ] 

Jark Wu commented on FLINK-16938:
-

Hi [~liuyufei], I think I get what's your problem. 

You are using {{System.currentTimeMillis()}} as the data of rowtime field. 
However, Flink recognize millisecond as UTC time zone, not local zone. 
If you want to express it in +8:00, you should plus the time offset. 

> SqlTimestamp has lag when convert long to Timestamp
> ---
>
> Key: FLINK-16938
> URL: https://issues.apache.org/jira/browse/FLINK-16938
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
>
> When I set rowtime attribute by using expression 'column.rowtime' , and 
> result type is sql.Timestamp, the result will have lag which is equals with 
> default timezone offset.
> {code:java}
> tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
> {code}
> I look into the conversion logic, the field was go through 'long -> 
> SqlTimestamp -> Timestamp' conversion. 
> {code:java}
> long from = System.currentTimeMillis();
> long to = SqlTimestamp
>   .fromEpochMillis(from)
>   .toTimestamp()
>   .getTime();
> {code}
> The result is {{from!=to}}.  In {{SqlTimestamp.toTimestamp()}} using 
> {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone 
> infomation, will casue time lag.
> From Timestamp to Timestamp not have this issue, but convert Datastream to 
> Table is use StreamRecord.timestamp as rowtime field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol commented on a change in pull request #11583: [FLINK-16883] Scan FLINK_CONF directory for available log4j configura…

2020-04-02 Thread GitBox
zentol commented on a change in pull request #11583: [FLINK-16883] Scan 
FLINK_CONF directory for available log4j configura…
URL: https://github.com/apache/flink/pull/11583#discussion_r402766083
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/flink-console.sh
 ##
 @@ -58,7 +58,9 @@ esac
 
 FLINK_TM_CLASSPATH=`constructFlinkClassPath`
 
-log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"
 "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" 
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+log4j_config=`findLog4jConfiguration ${FLINK_CONF_DIR}`
 
 Review comment:
   The knowledge of it being in Flink and "dedicated" to Flink could be useful 
in the future; e.g., we could think about it being modified at runtime via the 
WebUI to change the log level. You couldn't do this if the log file is at an 
arbitrary location (for security reasons).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF

2020-04-02 Thread GitBox
dianfu commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402765167
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+   private final DataType elementType;
+   private final ColumnVector data;
+   private final int offset;
+   private final int length;
+
+   public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+   this.elementType = elementType;
+   this.data = data;
+   this.offset = offset;
+   this.length = length;
+   }
+
+   @Override
+   public int numElements() {
+   return length;
+   }
+
+   @Override
+   public boolean isNullAt(int pos) {
+   return data.isNullAt(offset + pos);
+   }
+
+   @Override
+   public void setNullAt(int pos) {
+   throw new UnsupportedOperationException("Not support the 
operation!");
+   }
+
+   @Override
+   public boolean getBoolean(int ordinal) {
+   return ((BooleanColumnVector) data).getBoolean(offset + 
ordinal);
+   }
+
+   @Override
+   public byte getByte(int ordinal) {
+   return ((ByteColumnVector) data).getByte(offset + ordinal);
+   }
+
+   @Override
+   public short getShort(int ordinal) {
+   return ((ShortColumnVector) data).getShort(offset + ordinal);
+   }
+
+   @Override
+   public int getInt(int ordinal) {
+   return ((IntColumnVector) data).getInt(offset + ordinal);
+   }
+
+   @Override
+   public long getLong(int ordinal) {
+   return ((LongColumnVector) data).getLong(offset + ordinal);
+   }
+
+   @Override
+   public float getFloat(int ordinal) {
+   return ((FloatColumnVector) data).getFloat(offset + ordinal);
+   }
+
+   @Override
+   public double getDouble(int ordinal) {
+   return ((DoubleColumnVector) data).getDouble(offset + ordinal);
+   }
+
+   @Override
+   public BinaryString getString(int ordinal) {
+   BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+   return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+   }
+
+   @Override
+   public Decimal getDecimal(int ordinal, int precision, int scale) {

[GitHub] [flink] flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes 
e2e test stable
URL: https://github.com/apache/flink/pull/11619#issuecomment-607793023
 
 
   
   ## CI report:
   
   * 6bd37e40b9333d646bbf1bb1fbe286ad21d1e709 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158126927) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7013)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] Introduce file system table factory

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11574: [FLINK-16859][table-runtime] 
Introduce file system table factory
URL: https://github.com/apache/flink/pull/11574#issuecomment-606453374
 
 
   
   ## CI report:
   
   * 415f37b7a4c03d5591c9352f593eaecb23bc1cf3 UNKNOWN
   * ad9d0b168f308a5205430b2dcf85e680f20449b3 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157813487) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6994)
 
   * 756a68346b01c34e41fcb0b8d122e1a94207491e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16942) ES 5 sink should allows users to select netty transport client

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-16942.

Resolution: Fixed

master: 0629d169358e1b8e89545a0977f8995d298db398
1.10: 16b2ea41b094026ff7da13c3ce34a804c25aaffb 
1.9: 2d29e3ef807efc7dbe819f59a390c3d090a43654 

> ES 5 sink should allows users to select netty transport client
> --
>
> Key: FLINK-16942
> URL: https://issues.apache.org/jira/browse/FLINK-16942
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.9.0, 1.10.0, 1.11.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When assembling the settings for the {{PreBuiltTransportClient}} the 
> {{Elasticsearch5ApiCallBridge}} first adds the user-provided client and then 
> overrides http/transport types with netty 3.
> This means  that users are forced to use netty 3, despite the connector being 
> able to work with a more recent and secure version.
> {code:java}
> Settings settings = Settings.builder().put(clientConfig)
>   .put(NetworkModule.HTTP_TYPE_KEY, 
> Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
>   .put(NetworkModule.TRANSPORT_TYPE_KEY, 
> Netty3Plugin.NETTY_TRANSPORT_NAME)
>   .build();
> {code}
> We should invert the order in which the settings are added.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-16389:
-
Fix Version/s: 1.10.1
   1.9.3

> Bump Kafka 0.10 to 0.10.2.2
> ---
>
> Key: FLINK-16389
> URL: https://issues.apache.org/jira/browse/FLINK-16389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory

2020-04-02 Thread GitBox
leonardBang commented on a change in pull request #11574: 
[FLINK-16859][table-runtime] Introduce file system table factory
URL: https://github.com/apache/flink/pull/11574#discussion_r402761566
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * File system {@link TableFactory}.
+ *
+ * 1.The partition information should be in the file system path, whether 
it's a temporary
+ * table or a catalog table.
+ * 2.Support insert into (append) and insert overwrite.
+ * 3.Support static and dynamic partition inserting.
+ *
+ * Migrate to new source/sink interface after FLIP-95 is ready.
+ */
+public class FileSystemTableFactory implements
+   TableSourceFactory,
+   TableSinkFactory {
+
+   public static final String CONNECTOR_VALUE = "filesystem";
+
+   /**
+* Not use "connector.path" because:
+* 1.Using "connector.path" will conflict with current batch csv source 
and batch csv sink.
+* 2.This is compatible with FLIP-122.
+*/
+   public static final String PATH = "path";
+
+   /**
+* Move these properties to validator after FLINK-16904.
+*/
+   public static final ConfigOption PARTITION_DEFAULT_NAME = 
key("partition.default-name")
+   .stringType()
+   .defaultValue("__DEFAULT_PARTITION__")
+   .withDescription("The default partition name in case 
the dynamic partition" +
+   " column value is null/empty string");
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR, CONNECTOR_VALUE);
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // path
+   properties.add(PATH);
+
+   // schema
+   properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
+   properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+
+   properties.add(PARTITION_DEFAULT_NAME.key());
+
+   // format
+   properties.add(FORMAT);
+   properties.add(FORMAT + ".*");
+
+   return properties;
+   }
+
+   @Override
+   public TableSource 
createTableSource(TableSourceFactory.Context context) {
+   DescriptorProperties properties = new DescriptorProperties();
+   properties.putProperties(context.getTable().getProperties());
+
+   return new FileSystemTableSource(
+   context.getTable().getSchema(),
+   new Path(properties.getString(PATH)),
+   context.getTable().getPartitionKeys(),
+   getPartitionDefaultName(properties),
+   
getFormatProperties(context.getTable().getProperties()));
+   }
+
+ 

[jira] [Comment Edited] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2

2020-04-02 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074302#comment-17074302
 ] 

Chesnay Schepler edited comment on FLINK-16389 at 4/3/20, 6:27 AM:
---

master: 70a320b010910bf01ec342afe8371dbbf04c7ec0
1.10: 0be5d107a80c22f5761e2770a7a50c226adc8180 
1.9: af79f680edcc30906955e83eee04b39981a837f7 


was (Author: zentol):
master: 70a320b010910bf01ec342afe8371dbbf04c7ec0

> Bump Kafka 0.10 to 0.10.2.2
> ---
>
> Key: FLINK-16389
> URL: https://issues.apache.org/jira/browse/FLINK-16389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol merged pull request #11616: [FLINK-16942][es] Allow users to override http/transport type

2020-04-02 Thread GitBox
zentol merged pull request #11616: [FLINK-16942][es] Allow users to override 
http/transport type
URL: https://github.com/apache/flink/pull/11616
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer

2020-04-02 Thread GitBox
leonardBang commented on a change in pull request #11602: 
[FLINK-16912][parquet] Introduce table row write support for parquet writer
URL: https://github.com/apache/flink/pull/11602#discussion_r402758054
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.row;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.ParquetWriterFactory;
+import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
+import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+/**
+ * Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}.
+ */
+public class ParquetRowDataWriterTest {
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static final RowType ROW_TYPE = RowType.of(
+   new VarCharType(VarCharType.MAX_LENGTH),
+   new VarBinaryType(VarBinaryType.MAX_LENGTH),
+   new BooleanType(),
+   new TinyIntType(),
+   new SmallIntType(),
+   new IntType(),
+   new BigIntType(),
+   new FloatType(),
+   new DoubleType(),
+   new TimestampType(9),
+   new DecimalType(5, 0),
+   new DecimalType(15, 0),
+   new DecimalType(20, 0));
+
+   @SuppressWarnings("unchecked")
+   private static final DataFormatConverters.DataFormatConverter CONVERTER =
+   DataFormatConverters.getConverterForDataType(
+   
TypeConversions.fromLogicalToDataType(ROW_TYPE));
+
+   @Test
+   public void testTypes() throws IOException {
+   Configuration conf = new Configuration();
+   innerTest(conf, true);
+   innerTest(conf, false);
+   }
+
+   @Test
+   public void testCompression() throws IOException {
+   Configuration conf = new Configuration();
+   conf.set(ParquetOutputFormat.COMPRESSION, "GZIP");
+   innerTest(conf, true);
+   innerTest(conf, false);
+   }
+
+   private void innerTest(
+   Configuration conf,
+   boolean utcTimestamp) throws IOException {
+   Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), 
UUID.randomUU

[GitHub] [flink] leonardBang commented on a change in pull request #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer

2020-04-02 Thread GitBox
leonardBang commented on a change in pull request #11602: 
[FLINK-16912][parquet] Introduce table row write support for parquet writer
URL: https://github.com/apache/flink/pull/11602#discussion_r402756699
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
 ##
 @@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.row;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataformat.SqlTimestamp;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import static 
org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND;
+
+/**
+ * Writes a record to the Parquet API with the expected schema in order to be 
written to a file.
+ */
+public class ParquetRowDataWriter {
+
+   private final RecordConsumer recordConsumer;
+   private final boolean utcTimestamp;
+
+   private final FieldWriter[] filedWriters;
+   private final String[] fieldNames;
+
+   public ParquetRowDataWriter(
+   RecordConsumer recordConsumer,
+   RowType rowType,
+   GroupType schema,
+   boolean utcTimestamp) {
+   this.recordConsumer = recordConsumer;
+   this.utcTimestamp = utcTimestamp;
+
+   this.filedWriters = new FieldWriter[rowType.getFieldCount()];
+   this.fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+   for (int i = 0; i < rowType.getFieldCount(); i++) {
+   this.filedWriters[i] = 
createWriter(rowType.getTypeAt(i), schema.getType(i));
+   }
+   }
+
+   /**
+* It writes a record to Parquet.
+*
+* @param record Contains the record that is going to be written.
+*/
+   public void write(final BaseRow record) {
+   recordConsumer.startMessage();
+   for (int i = 0; i < filedWriters.length; i++) {
+   if (!record.isNullAt(i)) {
+   String fieldName = fieldNames[i];
+   FieldWriter writer = filedWriters[i];
+
+   recordConsumer.startField(fieldName, i);
+   writer.write(record, i);
+   recordConsumer.endField(fieldName, i);
+   }
+   }
+   recordConsumer.endMessage();
+   }
+
+   private FieldWriter createWriter(LogicalType t, Type type) {
+   if (type.isPrimitive()) {
+   switch (t.getTypeRoot()) {
+   case CHAR:
+   case VARCHAR:
+   return new StringWriter();
+   case BOOLEAN:
+   return new BooleanWriter();
+   case BINARY:
+   case VARBINARY:
+   return new BinaryW

[jira] [Updated] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-16389:
-
Affects Version/s: 1.9.0

> Bump Kafka 0.10 to 0.10.2.2
> ---
>
> Key: FLINK-16389
> URL: https://issues.apache.org/jira/browse/FLINK-16389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16389) Bump Kafka 0.10 to 0.10.2.2

2020-04-02 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-16389.

Resolution: Fixed

master: 70a320b010910bf01ec342afe8371dbbf04c7ec0

> Bump Kafka 0.10 to 0.10.2.2
> ---
>
> Key: FLINK-16389
> URL: https://issues.apache.org/jira/browse/FLINK-16389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol merged pull request #11617: [FLINK-16389][kafka] Bump kafka version to 0.10.2.2

2020-04-02 Thread GitBox
zentol merged pull request #11617: [FLINK-16389][kafka] Bump kafka version to 
0.10.2.2
URL: https://github.com/apache/flink/pull/11617
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF

2020-04-02 Thread GitBox
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402760160
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+   private final DataType elementType;
+   private final ColumnVector data;
+   private final int offset;
+   private final int length;
+
+   public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+   this.elementType = elementType;
+   this.data = data;
+   this.offset = offset;
+   this.length = length;
+   }
+
+   @Override
+   public int numElements() {
+   return length;
+   }
+
+   @Override
+   public boolean isNullAt(int pos) {
+   return data.isNullAt(offset + pos);
+   }
+
+   @Override
+   public void setNullAt(int pos) {
+   throw new UnsupportedOperationException("Not support the 
operation!");
+   }
+
+   @Override
+   public boolean getBoolean(int ordinal) {
+   return ((BooleanColumnVector) data).getBoolean(offset + 
ordinal);
+   }
+
+   @Override
+   public byte getByte(int ordinal) {
+   return ((ByteColumnVector) data).getByte(offset + ordinal);
+   }
+
+   @Override
+   public short getShort(int ordinal) {
+   return ((ShortColumnVector) data).getShort(offset + ordinal);
+   }
+
+   @Override
+   public int getInt(int ordinal) {
+   return ((IntColumnVector) data).getInt(offset + ordinal);
+   }
+
+   @Override
+   public long getLong(int ordinal) {
+   return ((LongColumnVector) data).getLong(offset + ordinal);
+   }
+
+   @Override
+   public float getFloat(int ordinal) {
+   return ((FloatColumnVector) data).getFloat(offset + ordinal);
+   }
+
+   @Override
+   public double getDouble(int ordinal) {
+   return ((DoubleColumnVector) data).getDouble(offset + ordinal);
+   }
+
+   @Override
+   public BinaryString getString(int ordinal) {
+   BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+   return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+   }
+
+   @Override
+   public Decimal getDecimal(int ordinal, int precision, int scale

[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-02 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r402757385
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##
 @@ -61,158 +61,180 @@
private final String clusterId;
private final String nameSpace;
 
-   public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client) {
+   private final ExecutorWrapper executorWrapper;
+
+   public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client, ExecutorWrapper executorWrapper) {
this.flinkConfig = checkNotNull(flinkConfig);
this.internalClient = checkNotNull(client);
this.clusterId = 
checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
 
this.nameSpace = 
flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
+
+   this.executorWrapper = executorWrapper;
}
 
@Override
-   public void createJobManagerComponent(KubernetesJobManagerSpecification 
kubernetesJMSpec) {
+   public CompletableFuture 
createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
final List accompanyingResources = 
kubernetesJMSpec.getAccompanyingResources();
 
// create Deployment
LOG.debug("Start to create deployment with spec {}", 
deployment.getSpec().toString());
-   final Deployment createdDeployment = this.internalClient
-   .apps()
-   .deployments()
-   .inNamespace(this.nameSpace)
-   .create(deployment);
-
-   // Note that we should use the uid of the created Deployment 
for the OwnerReference.
-   setOwnerReference(createdDeployment, accompanyingResources);
 
-   this.internalClient
-   .resourceList(accompanyingResources)
-   .inNamespace(this.nameSpace)
-   .createOrReplace();
+   return CompletableFuture.runAsync(() -> {
+   final Deployment createdDeployment = this.internalClient
+   .apps()
+   .deployments()
+   .inNamespace(this.nameSpace)
+   .create(deployment);
+
+   // Note that we should use the uid of the created 
Deployment for the OwnerReference.
+   setOwnerReference(createdDeployment, 
accompanyingResources);
+
+   this.internalClient
+   .resourceList(accompanyingResources)
+   .inNamespace(this.nameSpace)
+   .createOrReplace();
+   }, executorWrapper.getExecutor());
}
 
@Override
public void createTaskManagerPod(KubernetesPod kubernetesPod) {
-   final Deployment masterDeployment = this.internalClient
-   .apps()
-   .deployments()
-   .inNamespace(this.nameSpace)
-   .withName(KubernetesUtils.getDeploymentName(clusterId))
-   .get();
-
-   if (masterDeployment == null) {
-   throw new RuntimeException(
-   "Failed to find Deployment named " + clusterId 
+ " in namespace " + this.nameSpace);
-   }
+   CompletableFuture.runAsync(() -> {
+   final Deployment masterDeployment = this.internalClient
+   .apps()
+   .deployments()
+   .inNamespace(this.nameSpace)
+   
.withName(KubernetesUtils.getDeploymentName(clusterId))
+   .get();
+
+   if (masterDeployment == null) {
+   throw new RuntimeException(
+   "Failed to find Deployment named " + 
clusterId + " in namespace " + this.nameSpace);
+   }
 
-   // Note that we should use the uid of the master Deployment for 
the OwnerReference.
-   setOwnerReference(masterDeployment, 
Collections.singletonList(kubernetesPod.getInternalResource()));
+   // Note that we should use the uid of the master 
Deployment for the OwnerReference.
+   setOwnerReference(masterDeployment, 
Collections.singletonList(kubernetesPod.getInternalResource()));
 
-   LOG.debug("Start to create pod with metadata {}, spec {}",
-   kubern

[GitHub] [flink] JingsongLi commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory

2020-04-02 Thread GitBox
JingsongLi commented on a change in pull request #11574: 
[FLINK-16859][table-runtime] Introduce file system table factory
URL: https://github.com/apache/flink/pull/11574#discussion_r402753698
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * File system {@link TableFactory}.
+ *
+ * 1.The partition information should be in the file system path, whether 
it's a temporary
+ * table or a catalog table.
+ * 2.Support insert into (append) and insert overwrite.
+ * 3.Support static and dynamic partition inserting.
+ *
+ * Migrate to new source/sink interface after FLIP-95 is ready.
+ */
+public class FileSystemTableFactory implements
+   TableSourceFactory,
+   TableSinkFactory {
+
+   public static final String CONNECTOR_VALUE = "filesystem";
+
+   /**
+* Not use "connector.path" because:
+* 1.Using "connector.path" will conflict with current batch csv source 
and batch csv sink.
+* 2.This is compatible with FLIP-122.
+*/
+   public static final String PATH = "path";
+
+   /**
+* Move these properties to validator after FLINK-16904.
+*/
+   public static final ConfigOption PARTITION_DEFAULT_NAME = 
key("partition.default-name")
+   .stringType()
+   .defaultValue("__DEFAULT_PARTITION__")
+   .withDescription("The default partition name in case 
the dynamic partition" +
+   " column value is null/empty string");
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR, CONNECTOR_VALUE);
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // path
+   properties.add(PATH);
+
+   // schema
+   properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
+   properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+
+   properties.add(PARTITION_DEFAULT_NAME.key());
+
+   // format
+   properties.add(FORMAT);
+   properties.add(FORMAT + ".*");
+
+   return properties;
+   }
+
+   @Override
+   public TableSource 
createTableSource(TableSourceFactory.Context context) {
+   DescriptorProperties properties = new DescriptorProperties();
+   properties.putProperties(context.getTable().getProperties());
+
+   return new FileSystemTableSource(
+   context.getTable().getSchema(),
+   new Path(properties.getString(PATH)),
+   context.getTable().getPartitionKeys(),
+   getPartitionDefaultName(properties),
+   
getFormatProperties(context.getTable().getProperties()));
+   }
+
+  

[GitHub] [flink] carp84 commented on issue #11555: [FLINK-16576][state backends] Correct the logic of KeyGroupStateHandle#getIntersection

2020-04-02 Thread GitBox
carp84 commented on issue #11555: [FLINK-16576][state backends] Correct the 
logic of KeyGroupStateHandle#getIntersection
URL: https://github.com/apache/flink/pull/11555#issuecomment-608245577
 
 
   @StephanEwen could you also take a look here? Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16953) TableEnvHiveConnectorTest is unstable on travis.

2020-04-02 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-16953:
-
Description: 
[https://api.travis-ci.org/v3/job/670405441/log.txt]


{code:java}
[INFO] Running org.apache.flink.connectors.hive.TableEnvHiveConnectorTest
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 12.67 s 
<<< FAILURE! - in org.apache.flink.connectors.hive.TableEnvHiveConnectorTest
[ERROR] org.apache.flink.connectors.hive.TableEnvHiveConnectorTest  Time 
elapsed: 12.669 s  <<< ERROR!
java.lang.IllegalStateException: Failed to create HiveServer :Failed to get 
metastore connection
Caused by: java.lang.RuntimeException: Failed to get metastore connection
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Caused by: java.lang.reflect.InvocationTargetException
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: 
Could not connect to meta store using any of the URIs provided. Most recent 
failure: org.apache.thrift.transport.TTransportException: 
java.net.ConnectException: Connection refused (Connection refused)
at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:480)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:247)
at 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:70)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1706)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:83)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at 
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
at 
org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
at 
org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
at 
org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:388)
at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
at org.apache.hive.service.server.HiveServer2.init(HiveServer2.java:166)
at 
com.klarna.hiverunner.HiveServerContainer.init(HiveServerContainer.java:84)
at 
com.klarna.hiverunner.builder.HiveShellBase.start(HiveShellBase.java:165)
at 
org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.createHiveServerContainer(FlinkStandaloneHiveRunner.java:217)
at 
org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.access$600(FlinkStandaloneHiveRunner.java:92)
at 
org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner$2.before(FlinkStandaloneHiveRunner.java:131)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.s

[jira] [Created] (FLINK-16953) TableEnvHiveConnectorTest is unstable on travis.

2020-04-02 Thread Xintong Song (Jira)
Xintong Song created FLINK-16953:


 Summary: TableEnvHiveConnectorTest is unstable on travis.
 Key: FLINK-16953
 URL: https://issues.apache.org/jira/browse/FLINK-16953
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.11.0
Reporter: Xintong Song


[https://api.travis-ci.org/v3/job/670405441/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402745707
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -793,16 +799,26 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
return Optional.empty();
}
 
-   private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+   private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
for (TaskManagerSlot slot : slots.values()) {
if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
return true;
}
}
+
+   for (PendingTaskManagerSlot slot : pendingSlots.values()) {
+   if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
+   return true;
+   }
+   }
+
return false;
}
 
private Optional 
allocateResource(ResourceProfile resourceProfile) throws 
ResourceManagerException {
+   if (getNumberPendingTaskManagerSlots() + 
getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) {
+   return Optional.empty();
+   }
 
 Review comment:
   In addition, we should add warning logs explicitly show that slots are not 
allocated because of reaching the max limit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402744613
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -793,16 +799,26 @@ private void 
fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequ
return Optional.empty();
}
 
-   private boolean isFulfillableByRegisteredSlots(ResourceProfile 
resourceProfile) {
+   private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile 
resourceProfile) {
for (TaskManagerSlot slot : slots.values()) {
if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
return true;
}
}
+
+   for (PendingTaskManagerSlot slot : pendingSlots.values()) {
+   if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
+   return true;
+   }
+   }
+
return false;
}
 
private Optional 
allocateResource(ResourceProfile resourceProfile) throws 
ResourceManagerException {
+   if (getNumberPendingTaskManagerSlots() + 
getNumberRegisteredSlots() + numSlotsPerWorker > maxSlotNum) {
+   return Optional.empty();
+   }
 
 Review comment:
   I think this makes the SM/RM contract a bit unclear.
   
   Before this PR, SM requests individual slots from RM and is not aware of 
`numSlotsPerWorker`.
   
   With this change, SM is aware of slots per worker only for the max slots 
limit check, while for other logics (e.g., generating pending slots) it still 
pretend not aware of slots per worker.
   
   I would suggest to either move this check to the RM side, or base this PR on 
#11320, where we change the contract that SM request workers from RM and is 
aware of slots per worker in a consistent way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402719726
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -131,13 +134,15 @@ public SlotManagerImpl(
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
+   int maxSlotNum,
boolean waitResultConsumedBeforeRelease) {
 
this.slotMatchingStrategy = 
Preconditions.checkNotNull(slotMatchingStrategy);
this.scheduledExecutor = 
Preconditions.checkNotNull(scheduledExecutor);
this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
+   this.maxSlotNum = maxSlotNum;
 
 Review comment:
   We should have a sanity check here to make sure `maxSlotNum > 0`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402736479
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
 ##
 @@ -1293,7 +1293,18 @@ private SlotRequest createSlotRequest(JobID jobId, 
ResourceProfile resourceProfi
}
 
private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {
-   SlotManagerImpl slotManager = 
SlotManagerBuilder.newBuilder().build();
+   return createSlotManager(resourceManagerId, 
resourceManagerActions, 1);
+   }
+
+   private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions, int 
numSlotsPerWorker) {
+   return createSlotManager(resourceManagerId, 
resourceManagerActions, numSlotsPerWorker, Integer.MAX_VALUE);
+   }
+
+   private SlotManagerImpl createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions, int 
numSlotsPerWorker, int maxSlotNum) {
 
 Review comment:
   I think we should try to avoid such deep nested methods if possible.
   Since both `numSlotsPerWorker` and `maxSlotNum` can be read from a 
`Configuration`, I think keeping only two `createSlotManager` methods (one with 
`Configuration` and one without) should be enough.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402718103
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
 ##
 @@ -69,4 +69,11 @@
.text("Enable the slot spread out allocation 
strategy. This strategy tries to spread out " +
"the slots evenly across all available 
%s.", code("TaskExecutors"))
.build());
+
+   @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+   public static final ConfigOption MAX_SLOT_NUM = ConfigOptions
+   .key("cluster.number-of-slots.max")
+   .intType()
+   .defaultValue(Integer.MAX_VALUE)
+   .withDescription("Defines the max limitation of the total 
number of slots.");
 
 Review comment:
   I think we should add this to `ResourceManagerOptions` rather than 
`ClusterOptions`, with key prefix `slotmanager.*`. The option is only used in 
`SlotManager`, and once FLINK-14106 is finished this might even become a 
plugin-specific config option.
   
   And I would also mention in the description that this is meant for batch 
scenarios and might cause problems on streaming jobs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402740615
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -375,6 +375,12 @@ public void registerTaskManager(final 
TaskExecutorConnection taskExecutorConnect
if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {

reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
} else {
+   if (getNumberRegisteredSlots() + 
Math.max(getNumberPendingTaskManagerSlots(), numSlotsPerWorker) > maxSlotNum) {
+   LOG.warn("The total number of slots exceeds the 
max limitation, release the excess resource.");
+   
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new 
FlinkException("The total number of slots exceeds the max limitation."));
+   return;
+   }
 
 Review comment:
   Not sure about checking on task executors' registering.
   I think the max slot limit should not take effect for standalone clusters, 
because the number of task executors launched is not controlled by RM. We 
should also mention that in the config option description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-02 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r402719311
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java
 ##
 @@ -37,6 +38,7 @@ private SlotManagerBuilder() {
this.taskManagerRequestTimeout = TestingUtils.infiniteTime();
this.slotRequestTimeout = TestingUtils.infiniteTime();
this.taskManagerTimeout = TestingUtils.infiniteTime();
+   this.maxSlotNum = Integer.MAX_VALUE;
 
 Review comment:
   nit: maybe `MAX_SLOT_NUM.defaultValue()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory

2020-04-02 Thread GitBox
leonardBang commented on a change in pull request #11574: 
[FLINK-16859][table-runtime] Introduce file system table factory
URL: https://github.com/apache/flink/pull/11574#discussion_r402748927
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
 ##
 @@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.OverwritableTableSink;
+import org.apache.flink.table.sinks.PartitionableTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory;
+
+/**
+ * File system {@link TableSink}.
+ */
+public class FileSystemTableSink implements
+   AppendStreamTableSink,
+   PartitionableTableSink,
+   OverwritableTableSink {
+
+   private final TableSchema schema;
+   private final List partitionKeys;
+   private final Path path;
+   private final String defaultPartName;
+   private final Map formatProperties;
+
+   private boolean overwrite = false;
+   private boolean dynamicGrouping = false;
+   private LinkedHashMap staticPartitions = new 
LinkedHashMap<>();
+
+   /**
+* Construct a file system table sink.
+*
+* @param schema schema of the table.
+* @param path directory path of the file system table.
+* @param partitionKeys partition keys of the table.
+* @param defaultPartName The default partition name in case the 
dynamic partition column value
+*is null/empty string.
+* @param formatProperties format properties.
+*/
+   public FileSystemTableSink(
+   TableSchema schema,
+   Path path,
+   List partitionKeys,
+   String defaultPartName,
+   Map formatProperties) {
+   this.schema = schema;
+   this.path = path;
+   this.defaultPartName = defaultPartName;
+   this.formatProperties = formatProperties;
+   this.partitionKeys = partitionKeys;
+   }
+
+   @Override
+   public final DataStreamSink 
consumeDataStream(DataStream dataStream) {
+   RowDataPartitionComputer computer = new 
RowDataPartitionComputer(
+   defaultPartName,
+   schema.getFieldNames(),
+   schema.getFieldDataTypes(),
+   partitionKeys.toArray(new String[0]));
+
+   FileSystemOutputFormat.Builder builder = new 
FileSystemOutputFormat.Builder<>();
+   builder.setPartitionComputer(computer);
+   builder.setDynamicGrouped(dynamicGrouping);
+   builder.setPartitionColumns(partitionKeys.toArray(new 
String[0]));
+   builder.setFormatFactory(createOutputFormatFactory());
+   builder.setMetaStoreFactory(createTableMetaStoreFactory(path));
+   builder.setOverwrite(overwrite);
+   builde

[GitHub] [flink] leonardBang commented on a change in pull request #11574: [FLINK-16859][table-runtime] Introduce file system table factory

2020-04-02 Thread GitBox
leonardBang commented on a change in pull request #11574: 
[FLINK-16859][table-runtime] Introduce file system table factory
URL: https://github.com/apache/flink/pull/11574#discussion_r402748612
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * File system {@link TableFactory}.
+ *
+ * 1.The partition information should be in the file system path, whether 
it's a temporary
+ * table or a catalog table.
+ * 2.Support insert into (append) and insert overwrite.
+ * 3.Support static and dynamic partition inserting.
+ *
+ * Migrate to new source/sink interface after FLIP-95 is ready.
+ */
+public class FileSystemTableFactory implements
+   TableSourceFactory,
+   TableSinkFactory {
+
+   public static final String CONNECTOR_VALUE = "filesystem";
+
+   /**
+* Not use "connector.path" because:
+* 1.Using "connector.path" will conflict with current batch csv source 
and batch csv sink.
+* 2.This is compatible with FLIP-122.
+*/
+   public static final String PATH = "path";
+
+   /**
+* Move these properties to validator after FLINK-16904.
+*/
+   public static final ConfigOption PARTITION_DEFAULT_NAME = 
key("partition.default-name")
+   .stringType()
+   .defaultValue("__DEFAULT_PARTITION__")
+   .withDescription("The default partition name in case 
the dynamic partition" +
+   " column value is null/empty string");
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR, CONNECTOR_VALUE);
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // path
+   properties.add(PATH);
+
+   // schema
+   properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
+   properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+
+   properties.add(PARTITION_DEFAULT_NAME.key());
+
+   // format
+   properties.add(FORMAT);
+   properties.add(FORMAT + ".*");
+
+   return properties;
+   }
+
+   @Override
+   public TableSource 
createTableSource(TableSourceFactory.Context context) {
+   DescriptorProperties properties = new DescriptorProperties();
+   properties.putProperties(context.getTable().getProperties());
+
+   return new FileSystemTableSource(
+   context.getTable().getSchema(),
+   new Path(properties.getString(PATH)),
+   context.getTable().getPartitionKeys(),
+   getPartitionDefaultName(properties),
+   
getFormatProperties(context.getTable().getProperties()));
+   }
+
+ 

[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-02 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r402749513
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##
 @@ -61,158 +61,180 @@
private final String clusterId;
private final String nameSpace;
 
-   public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client) {
+   private final ExecutorWrapper executorWrapper;
+
+   public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client, ExecutorWrapper executorWrapper) {
this.flinkConfig = checkNotNull(flinkConfig);
this.internalClient = checkNotNull(client);
this.clusterId = 
checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
 
this.nameSpace = 
flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
+
+   this.executorWrapper = executorWrapper;
}
 
@Override
-   public void createJobManagerComponent(KubernetesJobManagerSpecification 
kubernetesJMSpec) {
+   public CompletableFuture 
createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
final List accompanyingResources = 
kubernetesJMSpec.getAccompanyingResources();
 
// create Deployment
LOG.debug("Start to create deployment with spec {}", 
deployment.getSpec().toString());
-   final Deployment createdDeployment = this.internalClient
-   .apps()
-   .deployments()
-   .inNamespace(this.nameSpace)
-   .create(deployment);
-
-   // Note that we should use the uid of the created Deployment 
for the OwnerReference.
-   setOwnerReference(createdDeployment, accompanyingResources);
 
-   this.internalClient
-   .resourceList(accompanyingResources)
-   .inNamespace(this.nameSpace)
-   .createOrReplace();
+   return CompletableFuture.runAsync(() -> {
+   final Deployment createdDeployment = this.internalClient
+   .apps()
+   .deployments()
+   .inNamespace(this.nameSpace)
+   .create(deployment);
+
+   // Note that we should use the uid of the created 
Deployment for the OwnerReference.
+   setOwnerReference(createdDeployment, 
accompanyingResources);
+
+   this.internalClient
+   .resourceList(accompanyingResources)
+   .inNamespace(this.nameSpace)
+   .createOrReplace();
+   }, executorWrapper.getExecutor());
}
 
@Override
public void createTaskManagerPod(KubernetesPod kubernetesPod) {
-   final Deployment masterDeployment = this.internalClient
-   .apps()
-   .deployments()
-   .inNamespace(this.nameSpace)
-   .withName(KubernetesUtils.getDeploymentName(clusterId))
-   .get();
-
-   if (masterDeployment == null) {
-   throw new RuntimeException(
-   "Failed to find Deployment named " + clusterId 
+ " in namespace " + this.nameSpace);
-   }
+   CompletableFuture.runAsync(() -> {
 
 Review comment:
   Good point. I will add a `retry` logic here. When all the retry times(3) is 
exhausted, we will throw the exception here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF

2020-04-02 Thread GitBox
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402746198
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+   private final DataType elementType;
+   private final ColumnVector data;
+   private final int offset;
+   private final int length;
+
+   public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+   this.elementType = elementType;
+   this.data = data;
+   this.offset = offset;
+   this.length = length;
+   }
+
+   @Override
+   public int numElements() {
+   return length;
+   }
+
+   @Override
+   public boolean isNullAt(int pos) {
+   return data.isNullAt(offset + pos);
+   }
+
+   @Override
+   public void setNullAt(int pos) {
+   throw new UnsupportedOperationException("Not support the 
operation!");
+   }
+
+   @Override
+   public boolean getBoolean(int ordinal) {
+   return ((BooleanColumnVector) data).getBoolean(offset + 
ordinal);
+   }
+
+   @Override
+   public byte getByte(int ordinal) {
+   return ((ByteColumnVector) data).getByte(offset + ordinal);
+   }
+
+   @Override
+   public short getShort(int ordinal) {
+   return ((ShortColumnVector) data).getShort(offset + ordinal);
+   }
+
+   @Override
+   public int getInt(int ordinal) {
+   return ((IntColumnVector) data).getInt(offset + ordinal);
+   }
+
+   @Override
+   public long getLong(int ordinal) {
+   return ((LongColumnVector) data).getLong(offset + ordinal);
+   }
+
+   @Override
+   public float getFloat(int ordinal) {
+   return ((FloatColumnVector) data).getFloat(offset + ordinal);
+   }
+
+   @Override
+   public double getDouble(int ordinal) {
+   return ((DoubleColumnVector) data).getDouble(offset + ordinal);
+   }
+
+   @Override
+   public BinaryString getString(int ordinal) {
+   BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+   return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+   }
+
+   @Override
+   public Decimal getDecimal(int ordinal, int precision, int scale

[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF

2020-04-02 Thread GitBox
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402747024
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+   private final DataType elementType;
+   private final ColumnVector data;
+   private final int offset;
+   private final int length;
+
+   public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+   this.elementType = elementType;
+   this.data = data;
+   this.offset = offset;
+   this.length = length;
 
 Review comment:
   length -> numElements?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF

2020-04-02 Thread GitBox
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402746856
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+   private final DataType elementType;
+   private final ColumnVector data;
+   private final int offset;
+   private final int length;
+
+   public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+   this.elementType = elementType;
+   this.data = data;
+   this.offset = offset;
+   this.length = length;
+   }
+
+   @Override
+   public int numElements() {
+   return length;
+   }
+
+   @Override
+   public boolean isNullAt(int pos) {
+   return data.isNullAt(offset + pos);
+   }
+
+   @Override
+   public void setNullAt(int pos) {
+   throw new UnsupportedOperationException("Not support the 
operation!");
+   }
+
+   @Override
+   public boolean getBoolean(int ordinal) {
+   return ((BooleanColumnVector) data).getBoolean(offset + 
ordinal);
+   }
+
+   @Override
+   public byte getByte(int ordinal) {
+   return ((ByteColumnVector) data).getByte(offset + ordinal);
+   }
+
+   @Override
+   public short getShort(int ordinal) {
+   return ((ShortColumnVector) data).getShort(offset + ordinal);
+   }
+
+   @Override
+   public int getInt(int ordinal) {
+   return ((IntColumnVector) data).getInt(offset + ordinal);
+   }
+
+   @Override
+   public long getLong(int ordinal) {
+   return ((LongColumnVector) data).getLong(offset + ordinal);
+   }
+
+   @Override
+   public float getFloat(int ordinal) {
+   return ((FloatColumnVector) data).getFloat(offset + ordinal);
+   }
+
+   @Override
+   public double getDouble(int ordinal) {
+   return ((DoubleColumnVector) data).getDouble(offset + ordinal);
+   }
+
+   @Override
+   public BinaryString getString(int ordinal) {
+   BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+   return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+   }
+
+   @Override
+   public Decimal getDecimal(int ordinal, int precision, int scale

[GitHub] [flink] JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] Support ArrayType in vectorized Python UDF

2020-04-02 Thread GitBox
JingsongLi commented on a change in pull request #11598: [FLINK-16914][python] 
Support ArrayType in vectorized Python UDF
URL: https://github.com/apache/flink/pull/11598#discussion_r402746022
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarArray.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.ArrayColumnVector;
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+import org.apache.flink.table.dataformat.vector.TimestampColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+/**
+ * Columnar array to support access to vector column data.
+ */
+public final class ColumnarArray implements BaseArray {
+
+   private final DataType elementType;
+   private final ColumnVector data;
+   private final int offset;
+   private final int length;
+
+   public ColumnarArray(DataType elementType, ColumnVector data, int 
offset, int length) {
+   this.elementType = elementType;
+   this.data = data;
+   this.offset = offset;
+   this.length = length;
+   }
+
+   @Override
+   public int numElements() {
+   return length;
+   }
+
+   @Override
+   public boolean isNullAt(int pos) {
+   return data.isNullAt(offset + pos);
+   }
+
+   @Override
+   public void setNullAt(int pos) {
+   throw new UnsupportedOperationException("Not support the 
operation!");
+   }
+
+   @Override
+   public boolean getBoolean(int ordinal) {
+   return ((BooleanColumnVector) data).getBoolean(offset + 
ordinal);
+   }
+
+   @Override
+   public byte getByte(int ordinal) {
+   return ((ByteColumnVector) data).getByte(offset + ordinal);
+   }
+
+   @Override
+   public short getShort(int ordinal) {
+   return ((ShortColumnVector) data).getShort(offset + ordinal);
+   }
+
+   @Override
+   public int getInt(int ordinal) {
+   return ((IntColumnVector) data).getInt(offset + ordinal);
+   }
+
+   @Override
+   public long getLong(int ordinal) {
+   return ((LongColumnVector) data).getLong(offset + ordinal);
+   }
+
+   @Override
+   public float getFloat(int ordinal) {
+   return ((FloatColumnVector) data).getFloat(offset + ordinal);
+   }
+
+   @Override
+   public double getDouble(int ordinal) {
+   return ((DoubleColumnVector) data).getDouble(offset + ordinal);
+   }
+
+   @Override
+   public BinaryString getString(int ordinal) {
+   BytesColumnVector.Bytes byteArray = getByteArray(ordinal);
+   return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+   }
+
+   @Override
+   public Decimal getDecimal(int ordinal, int precision, int scale

[jira] [Updated] (FLINK-16952) Parquet file system format support filter pushdown

2020-04-02 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-16952:
-
Description: 
We can create the conversion between Flink Expression(NOTE: should be new 
Expression instead of PlannerExpression) and parquet FilterPredicate.

And apply to Parquet file system format.

  was:
We can extract the conversion between Flink Expression and parquet 
FilterPredicate.

And apply to Parquet file system format.


> Parquet file system format support filter pushdown
> ---
>
> Key: FLINK-16952
> URL: https://issues.apache.org/jira/browse/FLINK-16952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.11.0
>
>
> We can create the conversion between Flink Expression(NOTE: should be new 
> Expression instead of PlannerExpression) and parquet FilterPredicate.
> And apply to Parquet file system format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16952) Parquet file system format support filter pushdown

2020-04-02 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-16952:


 Summary: Parquet file system format support filter pushdown
 Key: FLINK-16952
 URL: https://issues.apache.org/jira/browse/FLINK-16952
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.11.0


We can extract the conversion between Flink Expression and parquet 
FilterPredicate.

And apply to Parquet file system format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement 
ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433
 
 
   
   ## CI report:
   
   * 2670a045bd9c9c4c75e7a22bf188fc673c85a062 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158132906) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7017)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend 
and use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813
 
 
   
   ## CI report:
   
   * d8814c3aed36cea34b5069b2a270df8033d29956 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158126859) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7012)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime 
metric for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084
 
 
   
   ## CI report:
   
   * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015)
 
   * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16887) Refactor retraction rules to support inferring ChangelogMode

2020-04-02 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16887:

Description: 
Current retraction machanism only support 2 message kinds (+ and -). However, 
since FLIP-95, we will introduce more message kinds to users 
(insert/delete/update_before/update_after). 

In order to support that, we should first refactor current retraction rules to 
support ChangelogMode inference. In previous, every node will be attached with 
a AccMode trait after retraction rule. In the proposed design, we will infer 
ChangelogMode trait for every node. 

Design documentation: 
https://docs.google.com/document/d/1n_iXIQsKT3uiBqENR8j8RdjRhZfzMhhB66QZvx2rFjE/edit?ts=5e8419c1#

  was:
Current retraction machanism only support 2 message kinds (+ and -). However, 
since FLIP-95, we will introduce more message kinds to users 
(insert/delete/update_before/update_after). 

In order to support that, we should first refactor current retraction rules to 
support ChangelogMode inference. In previous, every node will be attached with 
a AccMode trait after retraction rule. In the proposed design, we will infer 
ChangelogMode trait for every node. 

A detailed design documentation will be attached soon. 


> Refactor retraction rules to support inferring ChangelogMode
> 
>
> Key: FLINK-16887
> URL: https://issues.apache.org/jira/browse/FLINK-16887
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.11.0
>
>
> Current retraction machanism only support 2 message kinds (+ and -). However, 
> since FLIP-95, we will introduce more message kinds to users 
> (insert/delete/update_before/update_after). 
> In order to support that, we should first refactor current retraction rules 
> to support ChangelogMode inference. In previous, every node will be attached 
> with a AccMode trait after retraction rule. In the proposed design, we will 
> infer ChangelogMode trait for every node. 
> Design documentation: 
> https://docs.google.com/document/d/1n_iXIQsKT3uiBqENR8j8RdjRhZfzMhhB66QZvx2rFjE/edit?ts=5e8419c1#



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-02 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r402739873
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/ExecutorWrapper.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExecutorUtils;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The help class to get the executor to run asynchronous operations on. It 
could create a dedicated thread pool or
+ * reuse an existing one.
+ */
+public class ExecutorWrapper {
+
+   private ExecutorService internalExecutorService;
+   private Executor executor;
+
+   private ExecutorWrapper(Configuration flinkConfig) {
+   internalExecutorService = Executors.newFixedThreadPool(
+   
flinkConfig.getInteger(KubernetesConfigOptions.CLIENT_ASYNC_THREAD_POOL_SIZE),
+   new ExecutorThreadFactory("FlinkKubeClient-IO"));
+   }
+
+   private ExecutorWrapper(Executor executor) {
 
 Review comment:
   The reason why i use `ExecutorWrapper` is to provide factory method to 
create a dedicated thread pool or reuse an existing one. However, i think this 
could also be done to make the argument of `Fabric8FlinkKubeClient` `Nullable`. 
I will go in this way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement 
ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433
 
 
   
   ## CI report:
   
   * 9e82da175780aa6a6ef19272a41c9a4796621e1a Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158126828) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7011)
 
   * 2670a045bd9c9c4c75e7a22bf188fc673c85a062 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-02 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r402738678
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
 ##
 @@ -183,6 +183,14 @@
.withDescription("The user-specified annotations that are set 
to the TaskManager pod. The value could be " +
"in the form of a1:v1,a2:v2");
 
+   public static final ConfigOption CLIENT_ASYNC_THREAD_POOL_SIZE 
=
 
 Review comment:
   Yes, this is exactly what i want too. Making 
`RpcEndpoint#getRpcService#getExecutor` return custom thread pool for I/O 
related execution is a very good idea. So i will remove this config option now 
and set the default thread pool size to 2.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16794) ClassNotFoundException caused by ClassLoader.getSystemClassLoader using impertinently

2020-04-02 Thread victor.jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074280#comment-17074280
 ] 

victor.jiang commented on FLINK-16794:
--

Hi Till Rohrmann,

In our application scenario for flink-client usage, we create a customized 
ClassLoader for loading flink dependent jars, in order to avoid the conflict 
with our app's jars.

For the improvement , pls fix it.

> ClassNotFoundException caused by ClassLoader.getSystemClassLoader using 
> impertinently  
> ---
>
> Key: FLINK-16794
> URL: https://issues.apache.org/jira/browse/FLINK-16794
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.8.3
>Reporter: victor.jiang
>Priority: Major
>
> In some containerization environment,the context classloader is not the 
> SystemClassLoader,it uses the customized classloader usually for the classes 
> isolation ,so the ClassNotFoundException may be caused。recommends using 
> getClass/Caller/ThreadCurrentContext 's ClassLoader。
> The related sources below:
> 1.flink-clients\src\main\java\org\apache\flink\client\program\ClusterClient.java"(690,33):
>  return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
>  
> 2.flink-clients\src\main\java\org\apache\flink\client\program\MiniClusterClient.java"(148,33):
>  return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
>  
> 3.flink-runtime\src\main\java\org\apache\flink\runtime\blob\BlobUtils.java"(348,66):
>  return (Throwable) InstantiationUtil.deserializeObject(bytes, 
> ClassLoader.getSystemClassLoader());
>  
> 4.flink-runtime\src\main\java\org\apache\flink\runtime\rest\messages\json\SerializedThrowableDeserializer.java"(52,68):
>  return InstantiationUtil.deserializeObject(serializedException, 
> ClassLoader.getSystemClassLoader());
>  
> 5.flink-runtime\src\main\java\org\apache\flink\runtime\rpc\messages\RemoteRpcInvocation.java"(118,67):
>  methodInvocation = 
> serializedMethodInvocation.deserializeValue(ClassLoader.getSystemClassLoader());



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint

2020-04-02 Thread GitBox
zhijiangW commented on issue #11534: [FLINK-16537][network] Implement 
ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#issuecomment-608230638
 
 
   Thanks for the review @AHeise @rkhachatryan , and  I have addressed all your 
comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement 
ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433
 
 
   
   ## CI report:
   
   * 9e82da175780aa6a6ef19272a41c9a4796621e1a Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158126828) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7011)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime 
metric for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084
 
 
   
   ## CI report:
   
   * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015)
 
   * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime 
metric for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084
 
 
   
   ## CI report:
   
   * 5ad66720b750455263df2ccb345f46b298d62faa Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/157442617) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6911)
 
   * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015)
 
   * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner

2020-04-02 Thread GitBox
docete commented on a change in pull request #11276: 
[FLINK-16029][table-planner-blink] Remove register source and sink in test 
cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402726288
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
 ##
 @@ -39,34 +36,23 @@ class TableSourceTest extends TableTestBase {
 
   @Before
   def setup(): Unit = {
-util.tableEnv.registerTableSource("FilterableTable", 
TestFilterableTableSource(false))
-TestPartitionableSourceFactory.registerTableSource(util.tableEnv, 
"PartitionableTable", false)
+TestFilterableTableSource.createTemporaryTable(
+  util.tableEnv,
+  TestFilterableTableSource.defaultSchema,
+  "FilterableTable")
+
+TestPartitionableSourceFactory.createTemporaryTable(util.tableEnv, 
"PartitionableTable", false)
   }
 
   @Test
   def testBoundedStreamTableSource(): Unit = {
-util.tableEnv.registerTableSource("MyTable", new TestTableSource(true, 
tableSchema))
+TestTableSource.createTemporaryTable(util.tableEnv, isBounded = true, 
tableSchema, "MyTable")
 util.verifyPlan("SELECT * FROM MyTable")
   }
 
   @Test
   def testUnboundedStreamTableSource(): Unit = {
-util.tableEnv.registerTableSource("MyTable", new TestTableSource(false, 
tableSchema))
-util.verifyPlan("SELECT * FROM MyTable")
-  }
-
-  @Test
-  def testNonStreamTableSource(): Unit = {
-val tableSource = new TableSource[Row]() {
-
-  override def getProducedDataType: DataType = tableSchema.toRowDataType
-
-  override def getTableSchema: TableSchema = tableSchema
-}
-util.tableEnv.registerTableSource("MyTable", tableSource)
-thrown.expect(classOf[ValidationException])
-thrown.expectMessage(
-  "Only StreamTableSource and LookupableTableSource can be used in Blink 
planner.")
 
 Review comment:
   It's only used in `convertSourceTable` which convert `ConnectorCatalogTable` 
to `TableSourceTable`. When there is no registerTableSource/Sink, there is no 
`ConnectorCatalogTable` either. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner

2020-04-02 Thread GitBox
docete commented on a change in pull request #11276: 
[FLINK-16029][table-planner-blink] Remove register source and sink in test 
cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402725215
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
 ##
 @@ -112,29 +78,28 @@ object MemoryTableSourceSinkUtil {
 }
   }
 
-  final class UnsafeMemoryOutputFormatTableSink extends 
OutputFormatTableSink[Row] {
-
-var fieldNames: Array[String] = _
-var fieldTypes: Array[TypeInformation[_]] = _
-
-override def getOutputType: TypeInformation[Row] = {
-  new RowTypeInfo(getTableSchema.getFieldTypes, 
getTableSchema.getFieldNames)
+  final class LegacyUnsafeMemoryAppendTableFactory extends 
StreamTableSinkFactory[Row] {
+override def createStreamTableSink(
+properties: util.Map[String, String]): StreamTableSink[Row] = {
+  val dp = new DescriptorProperties
+  dp.putProperties(properties)
+  val tableSchema = dp.getTableSchema(SCHEMA)
+  val sink = new UnsafeMemoryAppendTableSink
+  sink.configure(tableSchema.getFieldNames, tableSchema.getFieldTypes)
+.asInstanceOf[StreamTableSink[Row]]
 }
 
-override def getOutputFormat: OutputFormat[Row] = new 
MemoryCollectionOutputFormat
-
-override def configure(
-fieldNames: Array[String],
-fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
-  val newSink = new UnsafeMemoryOutputFormatTableSink
-  newSink.fieldNames = fieldNames
-  newSink.fieldTypes = fieldTypes
-  newSink
+override def requiredContext(): util.Map[String, String] = {
+  val context = new util.HashMap[String, String]()
+  context.put(CONNECTOR_TYPE, "LegacyUnsafeMemoryAppendTable")
 
 Review comment:
   It's not necessary to be a constant field, it's only used in this test case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong merged pull request #11357: [FLINK-16476] [table-planner-blink] Remove PowerMockito to avoid LinkageError in SelectivityEstimatorTest

2020-04-02 Thread GitBox
wuchong merged pull request #11357: [FLINK-16476] [table-planner-blink] Remove 
PowerMockito to avoid LinkageError in SelectivityEstimatorTest
URL: https://github.com/apache/flink/pull/11357
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-16476) SelectivityEstimatorTest logs LinkageErrors

2020-04-02 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu resolved FLINK-16476.
-
Resolution: Fixed

Fixed in mater (1.11.0) with 00526eba0f37f8869e62f41f43a40906e4169790 and 
b93c75b31e8ce17f57c8311f591a70316180df04

> SelectivityEstimatorTest logs LinkageErrors
> ---
>
> Key: FLINK-16476
> URL: https://issues.apache.org/jira/browse/FLINK-16476
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: godfrey he
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is the test run 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6038&view=logs&j=d47ab8d2-10c7-5d9e-8178-ef06a797a0d8&t=9a1abf5f-7cf4-58c3-bb2a-282a64aebb1f
> Log output
> {code}
> 2020-03-07T00:35:20.1270791Z [INFO] Running 
> org.apache.flink.table.planner.plan.metadata.SelectivityEstimatorTest
> 2020-03-07T00:35:21.6473057Z [INFO] Tests run: 3, Failures: 0, Errors: 0, 
> Skipped: 0, Time elapsed: 3.408 s - in 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtilTest
> 2020-03-07T00:35:21.6541713Z [INFO] Running 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCostTest
> 2020-03-07T00:35:21.7294613Z [INFO] Tests run: 2, Failures: 0, Errors: 0, 
> Skipped: 0, Time elapsed: 0.073 s - in 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCostTest
> 2020-03-07T00:35:21.7309958Z [INFO] Running 
> org.apache.flink.table.planner.plan.metadata.AggCallSelectivityEstimatorTest
> 2020-03-07T00:35:23.7443246Z ScriptEngineManager providers.next(): 
> javax.script.ScriptEngineFactory: Provider 
> jdk.nashorn.api.scripting.NashornScriptEngineFactory not a subtype
> 2020-03-07T00:35:23.8260013Z 2020-03-07 00:35:23,819 main ERROR Could not 
> reconfigure JMX java.lang.LinkageError: loader constraint violation: loader 
> (instance of 
> org/powermock/core/classloader/javassist/JavassistMockClassLoader) previously 
> initiated loading for a different type with name 
> "javax/management/MBeanServer"
> 2020-03-07T00:35:23.8262329Z  at java.lang.ClassLoader.defineClass1(Native 
> Method)
> 2020-03-07T00:35:23.8263241Z  at 
> java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> 2020-03-07T00:35:23.8264629Z  at 
> org.powermock.core.classloader.javassist.JavassistMockClassLoader.loadUnmockedClass(JavassistMockClassLoader.java:90)
> 2020-03-07T00:35:23.8266241Z  at 
> org.powermock.core.classloader.MockClassLoader.loadClassByThisClassLoader(MockClassLoader.java:104)
> 2020-03-07T00:35:23.8267808Z  at 
> org.powermock.core.classloader.DeferSupportingClassLoader.loadClass1(DeferSupportingClassLoader.java:147)
> 2020-03-07T00:35:23.8269485Z  at 
> org.powermock.core.classloader.DeferSupportingClassLoader.loadClass(DeferSupportingClassLoader.java:98)
> 2020-03-07T00:35:23.8270900Z  at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> 2020-03-07T00:35:23.8272000Z  at 
> org.apache.logging.log4j.core.jmx.Server.unregisterAllMatching(Server.java:337)
> 2020-03-07T00:35:23.8273779Z  at 
> org.apache.logging.log4j.core.jmx.Server.unregisterLoggerContext(Server.java:261)
> 2020-03-07T00:35:23.8275087Z  at 
> org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:165)
> 2020-03-07T00:35:23.8276515Z  at 
> org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:141)
> 2020-03-07T00:35:23.8278036Z  at 
> org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:590)
> 2020-03-07T00:35:23.8279741Z  at 
> org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651)
> 2020-03-07T00:35:23.8281190Z  at 
> org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668)
> 2020-03-07T00:35:23.8282440Z  at 
> org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253)
> 2020-03-07T00:35:23.8283717Z  at 
> org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153)
> 2020-03-07T00:35:23.8285186Z  at 
> org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45)
> 2020-03-07T00:35:23.8286575Z  at 
> org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
> 2020-03-07T00:35:23.8287933Z  at 
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138)
> 2020-03-07T00:35:23.8289393Z  at 
> org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45)
> 2020-03-07T00:35:23.8290816Z  at 
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48)
> 2020-03-07T00:35:23.8292179Z  at 

[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner

2020-04-02 Thread GitBox
docete commented on a change in pull request #11276: 
[FLINK-16029][table-planner-blink] Remove register source and sink in test 
cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402722415
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
 ##
 @@ -26,54 +26,19 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{DataTypes, TableException, TableSchema, 
Tumble, Types}
 import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, 
tupleData3, tupleData5}
 import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, 
TestingRetractTableSink, TestingUpsertTableSink}
-import 
org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.DataTypeAppendStreamTableSink
 import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, 
TableTestUtil}
 import org.apache.flink.table.sinks._
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit.Test
-
 import java.io.File
 import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 
 class TableSinkITCase extends AbstractTestBase {
 
-  @Test
-  def testInsertIntoRegisteredTableSink(): Unit = {
 
 Review comment:
   There is no registered table sink any more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime 
metric for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084
 
 
   
   ## CI report:
   
   * 5ad66720b750455263df2ccb345f46b298d62faa Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/157442617) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6911)
 
   * b709e1952df66cba5a316f9a46902538bf8cf245 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend 
and use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813
 
 
   
   ## CI report:
   
   * d20feb41f2e2940f6d6c8766586876addd3e2e92 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157491477) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6932)
 
   * d8814c3aed36cea34b5069b2a270df8033d29956 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158126859) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7012)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement 
ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433
 
 
   
   ## CI report:
   
   * d0d0b6fef21a118932e878255aa40f10f17fe753 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/155574041) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6686)
 
   * 9e82da175780aa6a6ef19272a41c9a4796621e1a Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158126828) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7011)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce 
table row write support for parquet writer
URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516
 
 
   
   ## CI report:
   
   * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes 
e2e test stable
URL: https://github.com/apache/flink/pull/11619#issuecomment-607793023
 
 
   
   ## CI report:
   
   * 68c82eef3469c403b1a0f8a64c13f04d989bf145 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/157750104) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6989)
 
   * 6bd37e40b9333d646bbf1bb1fbe286ad21d1e709 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158126927) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7013)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner

2020-04-02 Thread GitBox
docete commented on a change in pull request #11276: 
[FLINK-16029][table-planner-blink] Remove register source and sink in test 
cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402721935
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
 ##
 @@ -54,8 +55,67 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 object TestTableSources {
+  def createPersonCsvTemporaryTable(tEnv: TableEnvironment, tableName: 
String): Unit = {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner

2020-04-02 Thread GitBox
docete commented on a change in pull request #11276: 
[FLINK-16029][table-planner-blink] Remove register source and sink in test 
cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402721904
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
 ##
 @@ -73,121 +79,72 @@ class InMemoryLookupableTableSource(
 map.toMap
   }
 
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+null
+  }
+
   override def isAsyncEnabled: Boolean = asyncEnabled
 
-  override def getReturnType: TypeInformation[Row] = new 
RowTypeInfo(fieldTypes, fieldNames)
+  override def getProducedDataType: DataType = schema.toRowDataType
+
+  override def getTableSchema: TableSchema = schema
 
-  override def getTableSchema: TableSchema = new TableSchema(fieldNames, 
fieldTypes)
+
+  override def isBounded: Boolean = bounded
 
   @VisibleForTesting
   def getResourceCounter: Int = resourceCounter.get()
-
 }
 
-object InMemoryLookupableTableSource {
-
-  /**
-* Return a new builder that builds a [[InMemoryLookupableTableSource]].
-*
-* For example:
-*
-* {{{
-* val data = (
-*   (11, 1L, "Julian"),
-*   (22, 2L, "Jark"),
-*   (33, 3L, "Fabian"))
-*
-* val source = InMemoryLookupableTableSource.builder()
-*   .data(data)
-*   .field("age", Types.INT)
-*   .field("id", Types.LONG)
-*   .field("name", Types.STRING)
-*   .enableAsync()
-*   .build()
-* }}}
-*
-* @return a new builder to build a [[InMemoryLookupableTableSource]]
-*/
-  def builder(): Builder = new Builder
+class InMemoryLookupableTableFactory extends TableSourceFactory[Row] {
+  override def createTableSource(properties: util.Map[String, String]): 
TableSource[Row] = {
+val dp = new DescriptorProperties
+dp.putProperties(properties)
+val tableSchema = dp.getTableSchema(SCHEMA)
 
+val serializedData = dp.getString("data")
+val data = EncodingUtils.decodeStringToObject(serializedData, 
classOf[List[Product]])
 
-  /**
-* A builder for creating [[InMemoryLookupableTableSource]] instances.
-*
-* For example:
-*
-* {{{
-* val data = (
-*   (11, 1L, "Julian"),
-*   (22, 2L, "Jark"),
-*   (33, 3L, "Fabian"))
-*
-* val source = InMemoryLookupableTableSource.builder()
-*   .data(data)
-*   .field("age", Types.INT)
-*   .field("id", Types.LONG)
-*   .field("name", Types.STRING)
-*   .enableAsync()
-*   .build()
-* }}}
-*/
-  class Builder {
-private val schema = new mutable.LinkedHashMap[String, 
TypeInformation[_]]()
-private var data: List[Product] = _
-private var asyncEnabled: Boolean = false
-
-/**
-  * Sets table data for the table source.
-  */
-def data(data: List[Product]): Builder = {
-  this.data = data
-  this
+val rowData = data.map { entry =>
+  Row.of((0 until 
entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*)
 }
 
-/**
-  * Adds a field with the field name and the type information. Required.
-  * This method can be called multiple times. The call order of this 
method defines
-  * also the order of the fields in a row.
-  *
-  * @param fieldName the field name
-  * @param fieldType the type information of the field
-  */
-def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
-  if (schema.contains(fieldName)) {
-throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
-  }
-  schema += (fieldName -> fieldType)
-  this
-}
+val asyncEnabled = dp.getOptionalBoolean("is-async").orElse(false)
 
-/**
-  * Enables async lookup for the table source
-  */
-def enableAsync(): Builder = {
-  asyncEnabled = true
-  this
-}
+val bounded = dp.getOptionalBoolean("is-bounded").orElse(false)
 
-/**
-  * Apply the current values and constructs a newly-created 
[[InMemoryLookupableTableSource]].
-  *
-  * @return a newly-created [[InMemoryLookupableTableSource]].
-  */
-def build(): InMemoryLookupableTableSource = {
-  val fieldNames = schema.keys.toArray
-  val fieldTypes = schema.values.toArray
-  Preconditions.checkNotNull(data)
-  // convert
-  val rowData = data.map { entry =>
-Row.of((0 until 
entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*)
-  }
-  new InMemoryLookupableTableSource(
-fieldNames,
-fieldTypes,
-rowData,
-asyncEnabled
-  )
-}
+new InMemoryLookupableTableSource(tableSchema, rowData, asyncEnabled, 
bounded)
+  }
+
+  override def requiredContext(): util.Map[String, String] = {
+val co

[GitHub] [flink] wenlong88 commented on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-02 Thread GitBox
wenlong88 commented on issue #11564: [FLINK-16864][metrics] Add IdleTime metric 
for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-608213838
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402721375
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -351,6 +352,94 @@ private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) th
}
}
 
+   /**
+* Tests {@link ResultPartition#getAvailableFuture()}.
+*/
+   @Test
+   public void testIsAvailableOrNot() throws IOException, 
InterruptedException {
+   final int numAllBuffers = 10;
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNumNetworkBuffers(numAllBuffers).build();
+   final ResultPartition resultPartition = 
createPartition(network, ResultPartitionType.PIPELINED, 1);
+
+   try {
+   resultPartition.setup();
+
+   resultPartition.getBufferPool().setNumBuffers(2);
+
+   
assertTrue(resultPartition.getAvailableFuture().isDone());
+
+   resultPartition.getBufferBuilder();
+   resultPartition.getBufferBuilder();
+   
assertFalse(resultPartition.getAvailableFuture().isDone());
+   } finally {
+   resultPartition.release();
+   network.close();
+   }
+   }
+
+   /**
+* Tests {@link ResultPartition#getAvailableFuture()} with configured 
max backlogs.
+*/
+   @Test
+   public void testMaxBuffersPerChannelAndAvailability() throws 
IOException, InterruptedException {
+   final int numAllBuffers = 10;
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNumNetworkBuffers(numAllBuffers).build();
+   final ResultPartition resultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED)
+   .setNumberOfSubpartitions(2)
+   .setMaxBuffersPerChannel(1)
+   .build();
+
+   try {
+   resultPartition.setup();
+
+   
assertTrue(resultPartition.getAvailableFuture().isDone());
+
+   BufferAvailabilityListener listener0 = 
mock(BufferAvailabilityListener.class);
 
 Review comment:
   it's better to avoid the use of mock


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402720894
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 ##
 @@ -74,6 +74,8 @@
 
private final String compressionCodec;
 
+   private final int maxBuffersPerChannel;
 
 Review comment:
   it's better to update the ```hashCode```, ```equals``` and ```toString``` 
method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp

2020-04-02 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074255#comment-17074255
 ] 

YufeiLiu commented on FLINK-16938:
--

It wasn't I want to use SqlTimestamp, because of blink-planner intenal 
conversion, it cast input type to intenal type which is SqlTimestamp, and cast 
to output type in result. It's out of my control.

> SqlTimestamp has lag when convert long to Timestamp
> ---
>
> Key: FLINK-16938
> URL: https://issues.apache.org/jira/browse/FLINK-16938
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
>
> When I set rowtime attribute by using expression 'column.rowtime' , and 
> result type is sql.Timestamp, the result will have lag which is equals with 
> default timezone offset.
> {code:java}
> tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
> {code}
> I look into the conversion logic, the field was go through 'long -> 
> SqlTimestamp -> Timestamp' conversion. 
> {code:java}
> long from = System.currentTimeMillis();
> long to = SqlTimestamp
>   .fromEpochMillis(from)
>   .toTimestamp()
>   .getTime();
> {code}
> The result is {{from!=to}}.  In {{SqlTimestamp.toTimestamp()}} using 
> {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone 
> infomation, will casue time lag.
> From Timestamp to Timestamp not have this issue, but convert Datastream to 
> Table is use StreamRecord.timestamp as rowtime field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402719676
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -375,4 +397,32 @@ void onConsumedSubpartition(int subpartitionIndex) {
private void checkInProduceState() throws IllegalStateException {
checkState(!isFinished, "Partition already finished.");
}
+
+   /**
+* Check whether all subpartitions' backlogs are less than the 
limitation of max backlogs, and make this partition
+* available again if yes.
+*/
+   public void notifyDecreaseBacklog(int buffersInBacklog) {
+   if (buffersInBacklog == maxBuffersPerChannel) {
+   unavailableSubpartitionsCount--;
+   if (unavailableSubpartitionsCount == 0) {
+   CompletableFuture toNotify = 
availabilityHelper.getUnavailableToResetAvailable();
+   toNotify.complete(null);
+   int[] a = new int[1024];
+   }
+   }
+   }
+
+   /**
+* Check whether any subpartition's backlog exceeds the limitation of 
max backlogs, and make this partition
+* unavailabe if yes.
+*/
+   public void notifyIncreaseBacklog(int buffersInBacklog) {
+   if (buffersInBacklog == maxBuffersPerChannel + 1) {
+   unavailableSubpartitionsCount++;
+   if (unavailableSubpartitionsCount == 1) {
 
 Review comment:
   the above two lines can be replaced with:
   ```if (++unavailableSubpartitionsCount == 1) {```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402718999
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -375,4 +397,32 @@ void onConsumedSubpartition(int subpartitionIndex) {
private void checkInProduceState() throws IllegalStateException {
checkState(!isFinished, "Partition already finished.");
}
+
+   /**
+* Check whether all subpartitions' backlogs are less than the 
limitation of max backlogs, and make this partition
+* available again if yes.
+*/
+   public void notifyDecreaseBacklog(int buffersInBacklog) {
+   if (buffersInBacklog == maxBuffersPerChannel) {
+   unavailableSubpartitionsCount--;
+   if (unavailableSubpartitionsCount == 0) {
+   CompletableFuture toNotify = 
availabilityHelper.getUnavailableToResetAvailable();
+   toNotify.complete(null);
+   int[] a = new int[1024];
 
 Review comment:
   why we need this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402719144
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -375,4 +397,32 @@ void onConsumedSubpartition(int subpartitionIndex) {
private void checkInProduceState() throws IllegalStateException {
checkState(!isFinished, "Partition already finished.");
}
+
+   /**
+* Check whether all subpartitions' backlogs are less than the 
limitation of max backlogs, and make this partition
+* available again if yes.
+*/
+   public void notifyDecreaseBacklog(int buffersInBacklog) {
+   if (buffersInBacklog == maxBuffersPerChannel) {
+   unavailableSubpartitionsCount--;
+   if (unavailableSubpartitionsCount == 0) {
 
 Review comment:
   the above two lines can be replaced with:
   ```if (--unavailableSubpartitionsCount == 0) {```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-16885) SQL hive-connector wilcard excludes don't work on maven 3.1.X

2020-04-02 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee resolved FLINK-16885.
--
Resolution: Fixed

master: 2b94ca60ca437df98d0efeb7b9a43ecdf216825b

> SQL hive-connector wilcard excludes don't work on maven 3.1.X
> -
>
> Key: FLINK-16885
> URL: https://issues.apache.org/jira/browse/FLINK-16885
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors / Hive
>Affects Versions: 1.11.0
>Reporter: Chesnay Schepler
>Assignee: Jingsong Lee
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The sql-connector-hive modules added in FLINK-16455 use wildcards imports to 
> exclude all transitive dependencies from hive.
> This is a maven 3.2.1+ feature. This may imply that Flink cannot be properly 
> built anymore with maven 3.1 .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi merged pull request #11620: [FLINK-16885][hive] Remove wilcard excludes that don't work on maven 3.1.X

2020-04-02 Thread GitBox
JingsongLi merged pull request #11620: [FLINK-16885][hive] Remove wilcard 
excludes that don't work on maven 3.1.X
URL: https://github.com/apache/flink/pull/11620
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402718422
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -342,6 +342,7 @@ private void decreaseBuffersInBacklogUnsafe(boolean 
isBuffer) {
assert Thread.holdsLock(buffers);
if (isBuffer) {
buffersInBacklog--;
+   parent.notifyDecreaseBacklog(buffersInBacklog);
 
 Review comment:
   the above two lines can be replaced with:
   ```parent.notifyDecreaseBacklog(--buffersInBacklog);```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce 
table row write support for parquet writer
URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516
 
 
   
   ## CI report:
   
   * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11619: [FLINK-16921][e2e] Make Kubernetes 
e2e test stable
URL: https://github.com/apache/flink/pull/11619#issuecomment-607793023
 
 
   
   ## CI report:
   
   * 68c82eef3469c403b1a0f8a64c13f04d989bf145 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/157750104) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6989)
 
   * 6bd37e40b9333d646bbf1bb1fbe286ad21d1e709 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #11620: [FLINK-16885][hive] Remove wilcard excludes that don't work on maven 3.1.X

2020-04-02 Thread GitBox
JingsongLi commented on issue #11620: [FLINK-16885][hive] Remove wilcard 
excludes that don't work on maven 3.1.X
URL: https://github.com/apache/flink/pull/11620#issuecomment-608210790
 
 
   Merged #11620 to master.
   Thanks @lirui-apache for the review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #11567: [FLINK-16645] Limit the maximum backlogs in subpartitions

2020-04-02 Thread GitBox
wsry commented on a change in pull request #11567: [FLINK-16645] Limit the 
maximum backlogs in subpartitions
URL: https://github.com/apache/flink/pull/11567#discussion_r402718528
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -355,6 +356,7 @@ private void increaseBuffersInBacklog(BufferConsumer 
buffer) {
 
if (buffer != null && buffer.isBuffer()) {
buffersInBacklog++;
+   parent.notifyIncreaseBacklog(buffersInBacklog);
 
 Review comment:
   the above two lines can be replaced with:
   ```parent.notifyDecreaseBacklog(++buffersInBacklog);```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement ResultPartition state recovery for unaligned checkpoint

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11534: [FLINK-16537][network] Implement 
ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#issuecomment-604493433
 
 
   
   ## CI report:
   
   * d0d0b6fef21a118932e878255aa40f10f17fe753 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/155574041) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6686)
 
   * 9e82da175780aa6a6ef19272a41c9a4796621e1a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend 
and use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813
 
 
   
   ## CI report:
   
   * d20feb41f2e2940f6d6c8766586876addd3e2e92 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157491477) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6932)
 
   * d8814c3aed36cea34b5069b2a270df8033d29956 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16949) Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

2020-04-02 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074248#comment-17074248
 ] 

Jark Wu commented on FLINK-16949:
-

Thanks [~yunta], this approach sounds good to me. 

> Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
> ---
>
> Key: FLINK-16949
> URL: https://issues.apache.org/jira/browse/FLINK-16949
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Enhance specific UTs to use customized TtlTimeProvider to simulate changed 
> current time. This would introduce some changes to 
> {{AbstractStreamOperatorTestHarness}} and add new 
> {{StreamTaskStateInitializerTestImpl}} for different state backends.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp

2020-04-02 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074243#comment-17074243
 ] 

Jark Wu commented on FLINK-16938:
-

Regarding to the time attribute, yes, currently only TIMESTAMP(3) is supported. 
TIMESTAMP(3) WITH LOCAL TIME ZONE is in the future plan. 

> SqlTimestamp has lag when convert long to Timestamp
> ---
>
> Key: FLINK-16938
> URL: https://issues.apache.org/jira/browse/FLINK-16938
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
>
> When I set rowtime attribute by using expression 'column.rowtime' , and 
> result type is sql.Timestamp, the result will have lag which is equals with 
> default timezone offset.
> {code:java}
> tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
> {code}
> I look into the conversion logic, the field was go through 'long -> 
> SqlTimestamp -> Timestamp' conversion. 
> {code:java}
> long from = System.currentTimeMillis();
> long to = SqlTimestamp
>   .fromEpochMillis(from)
>   .toTimestamp()
>   .getTime();
> {code}
> The result is {{from!=to}}.  In {{SqlTimestamp.toTimestamp()}} using 
> {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone 
> infomation, will casue time lag.
> From Timestamp to Timestamp not have this issue, but convert Datastream to 
> Table is use StreamRecord.timestamp as rowtime field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #11619: [FLINK-16921][e2e] Make Kubernetes e2e test stable

2020-04-02 Thread GitBox
wangyang0918 commented on a change in pull request #11619: [FLINK-16921][e2e] 
Make Kubernetes e2e test stable
URL: https://github.com/apache/flink/pull/11619#discussion_r402715315
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/common_kubernetes.sh
 ##
 @@ -88,22 +86,43 @@ function start_kubernetes_if_not_running {
 }
 
 function start_kubernetes {
-[[ "${OS_TYPE}" = "linux" ]] && setup_kubernetes_for_linux
-if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} 
start_kubernetes_if_not_running; then
-echo "Could not start minikube. Aborting..."
-exit 1
+if [[ "${OS_TYPE}" != "linux" ]]; then
+if ! check_kubernetes_status; then
+echo "$NON_LINUX_ENV_NOTE"
+exit 1
+fi
+else
+setup_kubernetes_for_linux
+if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} 
start_kubernetes_if_not_running; then
+echo "Could not start minikube. Aborting..."
+exit 1
+fi
 fi
 eval $(minikube docker-env)
 }
 
 function stop_kubernetes {
-echo "Stopping minikube ..."
-stop_command="minikube stop"
-[[ "${OS_TYPE}" = "linux" ]] && stop_command="sudo ${stop_command}"
-if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} 
"${stop_command}"; then
-echo "Could not stop minikube. Aborting..."
-exit 1
+if [[ "${OS_TYPE}" != "linux" ]]; then
+echo "$NON_LINUX_ENV_NOTE"
+else
+echo "Stopping minikube ..."
+stop_command="sudo minikube stop"
+if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} 
"${stop_command}"; then
+echo "Could not stop minikube. Aborting..."
+exit 1
+fi
 fi
 }
 
+function debug_copy_and_show_logs {
+echo "Debugging failed Kubernetes test:"
+echo "Currently exiting Kubernetes resources"
 
 Review comment:
   Nice catch. I will fix it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-02 Thread GitBox
xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and 
use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-608203175
 
 
   @azagrebin Thanks for the review. Comments addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions

2020-04-02 Thread GitBox
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] 
Add RAW type support in DDL and functions
URL: https://github.com/apache/flink/pull/11568#discussion_r402701406
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/TestRelDataTypeFactory.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.table.calcite.RawRelDataTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+
+/**
+ * {@link RelDataTypeFactory} for testing purposes.
+ */
+public final class TestRelDataTypeFactory extends SqlTypeFactoryImpl 
implements RawRelDataTypeFactory {
+
+   TestRelDataTypeFactory(RelDataTypeSystem typeSystem) {
 
 Review comment:
   -1 for the idea of implements a `RawRelDataTypeFactory` interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions

2020-04-02 Thread GitBox
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] 
Add RAW type support in DDL and functions
URL: https://github.com/apache/flink/pull/11568#discussion_r402701101
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
 ##
 @@ -28,7 +27,18 @@
  * Type used during tests.
  */
 public class Fixture {
-   private final RelDataTypeFactory typeFactory;
+   private final TestRelDataTypeFactory typeFactory;
+
+   static final String RAW_TYPE_INT_CLASS = "java.lang.Integer";
+   static final String RAW_TYPE_INT_SERIALIZER_STRING =
 
 Review comment:
   Agree with @dawidwys, the current string would have problem for different 
sql text encoding. i.e. from `UTF-8` to `UTF-16`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions

2020-04-02 Thread GitBox
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] 
Add RAW type support in DDL and functions
URL: https://github.com/apache/flink/pull/11568#discussion_r402703105
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 ##
 @@ -484,6 +493,9 @@ object FlinkTypeFactory {
   // CURSOR for UDTF case, whose type info will never be used, just a 
placeholder
   case CURSOR => new TypeInformationRawType[Nothing](new NothingTypeInfo)
 
+  case null if relDataType.isInstanceOf[RawRelDataType] =>
 
 Review comment:
   I would suggest to use `SqlTypeName.ANY` which indicates the type comes from 
a Java object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions

2020-04-02 Thread GitBox
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] 
Add RAW type support in DDL and functions
URL: https://github.com/apache/flink/pull/11568#discussion_r402698067
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/table/calcite/RawRelDataTypeFactory.java
 ##
 @@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+/**
+ * A factory for creating a {@link RelDataType} that describes a RAW type such 
as
+ * {@code RAW('org.my.Class', 'sW3Djsds...')}.
+ */
+public interface RawRelDataTypeFactory {
+
+   RelDataType createRawType(String className, String serializerString);
 
 Review comment:
   Can we reuse the `FlinkRelDataTypeFactory`, we didn't make factory for 
specific rel type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #11568: [FLINK-16779][table] Add RAW type support in DDL and functions

2020-04-02 Thread GitBox
danny0405 commented on a change in pull request #11568: [FLINK-16779][table] 
Add RAW type support in DDL and functions
URL: https://github.com/apache/flink/pull/11568#discussion_r402692226
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
 ##
 @@ -889,6 +888,30 @@ SqlTypeNameSpec SqlMapTypeName() :
 }
 }
 
+/** Parses a SQL raw type such as {@code RAW('org.my.Class', 'sW3Djsds...')}. 
*/
+SqlTypeNameSpec SqlRawTypeName() :
+{
+SqlCharStringLiteral className;
+SqlCharStringLiteral serializerString;
+}
+{
+
+
+ {
+String cn = SqlParserUtil.parseString(token.image);
+className = SqlLiteral.createCharString(cn, getPos());
 
 Review comment:
   I would suggest to reuse `StringLiteral()` parse instead for these reasons:
   1. Handle the multi-segment/charset/unescape character correctly
   2. We would check the string format sooner or later, either in the parser 
and planner


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce table row write support for parquet writer

2020-04-02 Thread GitBox
flinkbot edited a comment on issue #11602: [FLINK-16912][parquet] Introduce 
table row write support for parquet writer
URL: https://github.com/apache/flink/pull/11602#issuecomment-607241516
 
 
   
   ## CI report:
   
   * 95f0deb44dfae1be800260732e47bf8ceaf9ef67 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/157722695) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6976)
 
   * 5dce0e57680a2e3b34f27ec2fac8ed7a59d00dbe Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158122639) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7010)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16951) Integrate parquet to file system connector

2020-04-02 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-16951:


 Summary: Integrate parquet to file system connector
 Key: FLINK-16951
 URL: https://issues.apache.org/jira/browse/FLINK-16951
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.0
Reporter: Jingsong Lee
Assignee: Jingsong Lee


Implement ParquetFileSystemFormatFactory



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16938) SqlTimestamp has lag when convert long to Timestamp

2020-04-02 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074220#comment-17074220
 ] 

Jark Wu edited comment on FLINK-16938 at 4/3/20, 2:34 AM:
--

Hi [~liuyufei], I'm interested why you want to use {{SqlTimestamp}} ?

{{SqlTimestamp}} is an internal data structure to represent TIMESTAMP type, 
which is equal to Java's LocalDateTime. 
The converntion from LocalDateTime to Timestamp is delegated to JDK's 
implementation {{Timestamp.valueOf(LocalDateTime)}} which keeps the equal 
string representation, not the underlying millisecond. 

cc [~docete]


was (Author: jark):
Hi [~liuyufei], I'm interested why you want to use {{SqlTimestamp}} ?

{{SqlTimestamp}} is an internal data structure to represent TIMESTAMP type, 
which is equal to Java's LocalDateTime. 
The converntion from LocalDateTime to Timestamp as delegate to JDK's 
implementation {{Timestamp.valueOf(LocalDateTime)}} which keeps the equal 
string representation, not the underlying millisecond. 

cc [~docete]

> SqlTimestamp has lag when convert long to Timestamp
> ---
>
> Key: FLINK-16938
> URL: https://issues.apache.org/jira/browse/FLINK-16938
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: YufeiLiu
>Priority: Major
>
> When I set rowtime attribute by using expression 'column.rowtime' , and 
> result type is sql.Timestamp, the result will have lag which is equals with 
> default timezone offset.
> {code:java}
> tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
> {code}
> I look into the conversion logic, the field was go through 'long -> 
> SqlTimestamp -> Timestamp' conversion. 
> {code:java}
> long from = System.currentTimeMillis();
> long to = SqlTimestamp
>   .fromEpochMillis(from)
>   .toTimestamp()
>   .getTime();
> {code}
> The result is {{from!=to}}.  In {{SqlTimestamp.toTimestamp()}} using 
> {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone 
> infomation, will casue time lag.
> From Timestamp to Timestamp not have this issue, but convert Datastream to 
> Table is use StreamRecord.timestamp as rowtime field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   >