[GitHub] [flink] Myasuka commented on a change in pull request #9023: [FLINK-13154][docs] Fix broken links of web docs

2019-07-11 Thread GitBox
Myasuka commented on a change in pull request #9023: [FLINK-13154][docs] Fix 
broken links of web docs
URL: https://github.com/apache/flink/pull/9023#discussion_r302837692
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -456,8 +456,6 @@ catalogs:
 
 Currently Flink supports two types of catalog - `FlinkInMemoryCatalog` and 
`HiveCatalog`.
 
-For more information about catalog, see [Catalogs]({{ site.baseurl 
}}/dev/table/catalog.html).
 
 Review comment:
   As discussed with @bowenli86 offline, we would first let this PR merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13083) Various improvements PubSub Connector

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883549#comment-16883549
 ] 

Jiangjie Qin commented on FLINK-13083:
--

[~Xeli] Thanks for the quick reply. Just a reminder that the rate limiter task 
is a public interface related work. It'll be great if we can get it done before 
Flink 1.9 releases.

> Various improvements PubSub Connector
> -
>
> Key: FLINK-13083
> URL: https://issues.apache.org/jira/browse/FLINK-13083
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Google Cloud PubSub
>Affects Versions: 1.9.0
>Reporter: Richard Deurwaarder
>Assignee: Richard Deurwaarder
>Priority: Minor
>
> Umbrella task to keep track of issues remaining when FLINK-9311 was merged.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] danny0405 closed pull request #9092: [FLINK-13211] Add drop table support for flink planner

2019-07-11 Thread GitBox
danny0405 closed pull request #9092: [FLINK-13211] Add drop table support for 
flink planner
URL: https://github.com/apache/flink/pull/9092
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9098: [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer

2019-07-11 Thread GitBox
flinkbot commented on issue #9098: [FLINK-13236][table-runtime-blink] Fix bug 
and improve performance in TopNBuffer
URL: https://github.com/apache/flink/pull/9098#issuecomment-510755324
 
 
   ## CI report:
   
   * 19d462121bdf06eb9982a9890dd5ce53cc9499b4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118883116)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13224) TupleTypeInfoBase equals function can't distinguish different struct type

2019-07-11 Thread forideal (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883545#comment-16883545
 ] 

forideal commented on FLINK-13224:
--

if user type equals device type,the following code will get a wrong type
{code:java}
public final RelDataType validateOperands(SqlValidator validator, 
SqlValidatorScope scope, SqlCall call) { 
this.preValidateCall(validator, scope, call); this.checkOperandCount(validator, 
 this.operandTypeChecker, call); SqlCallBinding opBinding = new 
SqlCallBinding(validator,   scope, call); this.checkOperandTypes(opBinding, 
true); RelDataType ret = this.inferReturnType(opBinding); 
((SqlValidatorImpl)validator).setValidatedNodeType(call, ret); return ret; 
}
{code}
this code:
{code:java}
RelDataType ret = this.inferReturnType(opBinding);
{code}
 

> TupleTypeInfoBase equals function can't distinguish different struct type
> -
>
> Key: FLINK-13224
> URL: https://issues.apache.org/jira/browse/FLINK-13224
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
> Environment: flink 1.8.0
>Reporter: forideal
>Priority: Major
>
> Hi
> i have two struct type
> one is
> {code:java}
> // code placeholder
> Types.ROW_NAMED(
>          new String[]{"device"},
>          Types.PRIMITIVE_ARRAY(Types.BYTE)
> )
> {code}
> the other is
>  
> {code:java}
> // code placeholder
> Types.ROW_NAMED(
>          new String[]{"app"},
>          Types.PRIMITIVE_ARRAY(Types.BYTE)
> )
> {code}
> when i compare those two types ,the equals function returns true.
> there are some code in TupleTypeInfoBase         
> {code:java}
> // code placeholder
> return other.canEqual(this) &&
>            super.equals(other) &&
>Arrays.equals(types, other.types) &&
>totalFields == other.totalFields;
> {code}
> i think,The equals function should compare field names.
> eg:
> {code:java}
> // code placeholder
> if (totalFields == other.totalFields) {
>  String[] otherFieldNames = other.getFieldNames();
>  String[] fieldNames = this.getFieldNames();
>  for (int i = 0; i < totalFields; i++) {
>  if (!otherFieldNames[i].equals(fieldNames[i])) {
>  return false;
>  }
>  }
>  } else{
>return false;
>  }
> return other.canEqual(this) &&
>super.equals(other) &&
>Arrays.equals(types, other.types);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] TsReaper commented on issue #9098: [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer

2019-07-11 Thread GitBox
TsReaper commented on issue #9098: [FLINK-13236][table-runtime-blink] Fix bug 
and improve performance in TopNBuffer
URL: https://github.com/apache/flink/pull/9098#issuecomment-510752945
 
 
   @beyond1920 @wuchong @JingsongLi please take a look if you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID

2019-07-11 Thread GitBox
flinkbot commented on issue #7262: [FLINK-10478] Kafka Producer wrongly formats 
% for transaction ID
URL: https://github.com/apache/flink/pull/7262#issuecomment-510752916
 
 
   ## CI report:
   
   * 15f05f4c5791d9d42610099324e59057d26bd3ff : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118882595)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9021: [FLINK-13205][runtime] Make checkpoints injection ordered with stop-with-savepoint

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9021: [FLINK-13205][runtime] Make 
checkpoints injection ordered with stop-with-savepoint
URL: https://github.com/apache/flink/pull/9021#issuecomment-510405873
 
 
   ## CI report:
   
   * abed4b5678a2f09b3bb729bd62b5264e56b55b9f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118731856)
   * 505ec154b21e0340e112f16fcfcfb1eeb52fa345 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118827980)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9098: [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer

2019-07-11 Thread GitBox
flinkbot commented on issue #9098: [FLINK-13236][table-runtime-blink] Fix bug 
and improve performance in TopNBuffer
URL: https://github.com/apache/flink/pull/9098#issuecomment-510753038
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13236) Fix bug and improve performance in TopNBuffer

2019-07-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13236:
---
Labels: pull-request-available  (was: )

> Fix bug and improve performance in TopNBuffer
> -
>
> Key: FLINK-13236
> URL: https://issues.apache.org/jira/browse/FLINK-13236
> Project: Flink
>  Issue Type: Improvement
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Minor
>  Labels: pull-request-available
>
> In {{TopNBuffer}} we have the following method:
> {code:java}
> /**
>  * Puts a record list into the buffer under the sortKey.
>  * Note: if buffer already contains sortKey, putAll will overwrite the 
> previous value
>  *
>  * @param sortKey sort key with which the specified values are to be 
> associated
>  * @param values record lists to be associated with the specified key
>  */
> void putAll(BaseRow sortKey, Collection values) {
>   treeMap.put(sortKey, values);
>   currentTopNum += values.size();
> }
> {code}
> When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should 
> be first subtracted by the old {{value.size()}} in {{treeMap}} then added 
> (can't be directly added). As currently only {{AppendOnlyTopNFunction}} uses 
> this method in its init procedure, this bug is not triggered.
> {code:java}
> /**
>  * Gets record which rank is given value.
>  *
>  * @param rank rank value to search
>  * @return the record which rank is given value
>  */
> BaseRow getElement(int rank) {
>   int curRank = 0;
>   Iterator>> iter = 
> treeMap.entrySet().iterator();
>   while (iter.hasNext()) {
>   Map.Entry> entry = iter.next();
>   Collection list = entry.getValue();
>   Iterator listIter = list.iterator();
>   while (listIter.hasNext()) {
>   BaseRow elem = listIter.next();
>   curRank += 1;
>   if (curRank == rank) {
>   return elem;
>   }
>   }
>   }
>   return null;
> }
> {code}
> We can remove the inner loop by adding {{curRank}} by {{list.size()}} each 
> time.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] TsReaper opened a new pull request #9098: [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer

2019-07-11 Thread GitBox
TsReaper opened a new pull request #9098: [FLINK-13236][table-runtime-blink] 
Fix bug and improve performance in TopNBuffer
URL: https://github.com/apache/flink/pull/9098
 
 
   ## What is the purpose of the change
   
   The value of `currentTopNum` will be incorrectly calculated in the `putAll` 
method of `TopNBuffer` when the given `sortKey` exists in `treeMap`.
   
   Also, the inner loop of `getElement` method can be removed to improve 
performance.
   
   ## Brief change log
   
- Fix `putAll` method in `TopNBuffer`
- Improve performance of `getElement` method in `TopNBuffer`
   
   ## Verifying this change
   
   This change is a trivial bug fix / improvement without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13220) Add create/drop table support for blink planner

2019-07-11 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-13220:
---
Summary: Add create/drop table support for blink planner  (was: Add DDL 
support for blink planner)

> Add create/drop table support for blink planner
> ---
>
> Key: FLINK-13220
> URL: https://issues.apache.org/jira/browse/FLINK-13220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (FLINK-13220) Add create/drop table support for blink planner

2019-07-11 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-13220.
--
Resolution: Fixed

merged in 1.9.0: db488c0d19dbb96466b9b765cecc06784b844926

> Add create/drop table support for blink planner
> ---
>
> Key: FLINK-13220
> URL: https://issues.apache.org/jira/browse/FLINK-13220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13220) Add DDL support for blink planner

2019-07-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13220:
---
Labels: pull-request-available  (was: )

> Add DDL support for blink planner
> -
>
> Key: FLINK-13220
> URL: https://issues.apache.org/jira/browse/FLINK-13220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] becketqin commented on a change in pull request #7262: [FLINK-10478] Kafka Producer wrongly formats % for transaction ID

2019-07-11 Thread GitBox
becketqin commented on a change in pull request #7262: [FLINK-10478] Kafka 
Producer wrongly formats % for transaction ID
URL: https://github.com/apache/flink/pull/7262#discussion_r302832450
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ##
 @@ -55,7 +55,7 @@ public TransactionalIdsGenerator(
checkArgument(safeScaleDownFactor > 0);
checkArgument(subtaskIndex >= 0);
 
-   this.prefix = checkNotNull(prefix);
+   this.prefix = checkNotNull(prefix).replaceAll("%", "%%");
 
 Review comment:
   Would it be better to keep the original String as is, but only replace `%` 
with `%%` when `String.format()` is called? This will ensure all the variables 
have a consistent view of a String.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung closed pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] 
Add drop table support for flink planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13211) Add drop table support for flink planner

2019-07-11 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-13211.
--
Resolution: Fixed

merged in 1.9.0: 58280f44f5ff62d129580bb091671a52537536e3

> Add drop table support for flink planner
> 
>
> Key: FLINK-13211
> URL: https://issues.apache.org/jira/browse/FLINK-13211
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot edited a comment on issue #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9057: [FLINK-13121] [table-planner-blink] 
Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#issuecomment-510405733
 
 
   ## CI report:
   
   * b616282cb875778a7a5af22a2783eaaf48104908 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118709811)
   * 62a9ff2805a3c04e160a7f7d52f40acf049d9c4b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118818602)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10636) ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883533#comment-16883533
 ] 

Jiangjie Qin commented on FLINK-10636:
--

[~lsy] Is this issue on Kafka 0.8? Starting from Kafka 0.9, the clients no 
longer talk to zk anymore. So it would be good to upgrade to a later Kafka 
version.

> ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> -
>
> Key: FLINK-10636
> URL: https://issues.apache.org/jira/browse/FLINK-10636
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.1
>Reporter: dalongliu
>Priority: Major
>
> Hi, all, when I upgrade flink version from 1.3.2 to 1.6.1, the version of 
> FlinkKafkaComuser is 0.8. After start my job, encountered the following 
> problems:
> 2018-10-22 09:54:42,987 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/tmp/jaas-8615545579287424864.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> 2018-10-22 09:54:42,993 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 10.208.75.87/10.208.75.87:2181
> 2018-10-22 09:54:42,993 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 10.208.75.85/10.208.75.85:2181
> 2018-10-22 09:54:42,993 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 10.208.75.85/10.208.75.85:2181
> 2018-10-22 09:54:42,994 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 10.208.75.87/10.208.75.87:2181
> 2018-10-22 09:54:42,994 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> 2018-10-22 09:54:42,997 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> 2018-10-22 09:54:42,994 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to 10.208.75.85/10.208.75.85:2181, initiating session



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] tzulitai commented on issue #9094: [FLINK-13094][state-processor-api] Provide an easy way to read timers using the State Processor API

2019-07-11 Thread GitBox
tzulitai commented on issue #9094: [FLINK-13094][state-processor-api] Provide 
an easy way to read timers using the State Processor API
URL: https://github.com/apache/flink/pull/9094#issuecomment-510745686
 
 
   @flinkbot approve all
   
   The changes look good to me, thanks for the quick preparation of the PR.
   +1 to merge once Travis is green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9064: [FLINK-13188][Runtime / State Backends][Test] Fix MockStateBackend#resolveCheckpointStorageLocation return null cause intellij assertion dete

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9064: [FLINK-13188][Runtime / State 
Backends][Test] Fix MockStateBackend#resolveCheckpointStorageLocation return 
null cause intellij assertion detect test failed
URL: https://github.com/apache/flink/pull/9064#issuecomment-510409332
 
 
   ## CI report:
   
   * 77d92ad5b8be29975fe61014ea1445955fc7841d : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118880623)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9090: [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9090: [FLINK-13124] Don't forward 
exceptions when finishing SourceStreamTask
URL: https://github.com/apache/flink/pull/9090#issuecomment-510496617
 
 
   ## CI report:
   
   * 6c229740b17ae4e0df7c8ba9678ada086ae8bf47 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118880603)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-510405859
 
 
   ## CI report:
   
   * 4afedee15460ac0f1f2945ca657581c538ddfc06 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118723073)
   * f639acfa778cc8e31581107f27e3cf0139e3a98d : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118880117)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13115) Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-13115.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

merged in 1.9.0: 9eb3ecab244f0b12f7cbdddbcef5585cb4bcea58

> Introduce planner rule to support partition pruning for 
> PartitionableTableSource
> 
>
> Key: FLINK-13115
> URL: https://issues.apache.org/jira/browse/FLINK-13115
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This issue aims to support partition pruning for {{PartitionableTableSource}}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] KurtYoung closed pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
KurtYoung closed pull request #9080: [FLINK-13115] [table-planner-blink] 
Introduce planner rule to support partition pruning for PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
KurtYoung commented on issue #9080: [FLINK-13115] [table-planner-blink] 
Introduce planner rule to support partition pruning for PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#issuecomment-510742027
 
 
   passed here: https://travis-ci.org/KurtYoung/flink/builds/557654396


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13236) Fix bug and improve performance in TopNBuffer

2019-07-11 Thread Caizhi Weng (JIRA)
Caizhi Weng created FLINK-13236:
---

 Summary: Fix bug and improve performance in TopNBuffer
 Key: FLINK-13236
 URL: https://issues.apache.org/jira/browse/FLINK-13236
 Project: Flink
  Issue Type: Improvement
Reporter: Caizhi Weng
Assignee: Caizhi Weng


In {{TopNBuffer}} we have the following method:

{code:java}
/**
 * Puts a record list into the buffer under the sortKey.
 * Note: if buffer already contains sortKey, putAll will overwrite the previous 
value
 *
 * @param sortKey sort key with which the specified values are to be associated
 * @param values record lists to be associated with the specified key
 */
void putAll(BaseRow sortKey, Collection values) {
treeMap.put(sortKey, values);
currentTopNum += values.size();
}
{code}

When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should be 
first subtracted by the old {{value.size()}} in {{treeMap}} then added (can't 
be directly added). As currently only {{AppendOnlyTopNFunction}} uses this 
method in its init procedure, this bug is not triggered.


{code:java}
/**
 * Gets record which rank is given value.
 *
 * @param rank rank value to search
 * @return the record which rank is given value
 */
BaseRow getElement(int rank) {
int curRank = 0;
Iterator>> iter = 
treeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry> entry = iter.next();
Collection list = entry.getValue();

Iterator listIter = list.iterator();
while (listIter.hasNext()) {
BaseRow elem = listIter.next();
curRank += 1;
if (curRank == rank) {
return elem;
}
}
}
return null;
}
{code}

We can remove the inner loop by adding {{curRank}} by {{list.size()}} each time.




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-510405859
 
 
   ## CI report:
   
   * 4afedee15460ac0f1f2945ca657581c538ddfc06 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118723073)
   * f639acfa778cc8e31581107f27e3cf0139e3a98d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118821315)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-07-11 Thread GitBox
flinkbot commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-510741229
 
 
   ## CI report:
   
   * e0f3d5b774c08e2154b31fdde4b2c193276723a3 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118879513)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9090: [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9090: [FLINK-13124] Don't forward 
exceptions when finishing SourceStreamTask
URL: https://github.com/apache/flink/pull/9090#issuecomment-510496617
 
 
   ## CI report:
   
   * 6c229740b17ae4e0df7c8ba9678ada086ae8bf47 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118851219)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9064: [FLINK-13188][Runtime / State Backends][Test] Fix MockStateBackend#resolveCheckpointStorageLocation return null cause intellij assertion dete

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9064: [FLINK-13188][Runtime / State 
Backends][Test] Fix MockStateBackend#resolveCheckpointStorageLocation return 
null cause intellij assertion detect test failed
URL: https://github.com/apache/flink/pull/9064#issuecomment-510409332
 
 
   ## CI report:
   
   * 77d92ad5b8be29975fe61014ea1445955fc7841d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118815826)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10806) Support multiple consuming offsets when discovering a new topic

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883520#comment-16883520
 ] 

Jiangjie Qin commented on FLINK-10806:
--

Would setting \{{auto.offset.reset}} for the KafkaConsumer meet your 
requirements? See [https://kafka.apache.org/documentation/] (search for 
"auto.offset.reset")

> Support multiple consuming offsets when discovering a new topic
> ---
>
> Key: FLINK-10806
> URL: https://issues.apache.org/jira/browse/FLINK-10806
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.6.2
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> In KafkaConsumerBase, we discover the TopicPartitions and compare them with 
> the restoredState. It's reasonable when a topic's partitions scaled. However, 
> if we add a new topic which has too much data and restore the Flink program, 
> the data of the new topic will be consumed from the start, which may not be 
> what we want.  I think this should be an option for developers.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-10806) Support multiple consuming offsets when discovering a new topic

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883520#comment-16883520
 ] 

Jiangjie Qin edited comment on FLINK-10806 at 7/12/19 4:31 AM:
---

[~wind_ljy] Would setting {{auto.offset.reset}} for the KafkaConsumer meet your 
requirements? See [https://kafka.apache.org/documentation/] (search for 
"auto.offset.reset")


was (Author: becket_qin):
Would setting \{{auto.offset.reset}} for the KafkaConsumer meet your 
requirements? See [https://kafka.apache.org/documentation/] (search for 
"auto.offset.reset")

> Support multiple consuming offsets when discovering a new topic
> ---
>
> Key: FLINK-10806
> URL: https://issues.apache.org/jira/browse/FLINK-10806
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.6.2
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> In KafkaConsumerBase, we discover the TopicPartitions and compare them with 
> the restoredState. It's reasonable when a topic's partitions scaled. However, 
> if we add a new topic which has too much data and restore the Flink program, 
> the data of the new topic will be consumed from the start, which may not be 
> what we want.  I think this should be an option for developers.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot commented on issue #7487: [FLINK-11321] Clarify NPE on fetching nonexistent topic

2019-07-11 Thread GitBox
flinkbot commented on issue #7487: [FLINK-11321] Clarify NPE on fetching 
nonexistent topic
URL: https://github.com/apache/flink/pull/7487#issuecomment-510739413
 
 
   ## CI report:
   
   * c746405bfc3264fec2e9a5a6551360e37aa5688f : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118879011)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-07-11 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-510738891
 
 
   Thanks for @zentol 's suggestions. PR updated accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10876) Deadlock if closing firstly pending transactions in FlinkKafkaProducer(011).close()

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883518#comment-16883518
 ] 

Jiangjie Qin commented on FLINK-10876:
--

Is this still an issue given FLINK-10455 is fixed? Do you have a stacktrace to 
show where the deadlock is?

> Deadlock if closing firstly pending transactions in 
> FlinkKafkaProducer(011).close()
> ---
>
> Key: FLINK-10876
> URL: https://issues.apache.org/jira/browse/FLINK-10876
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Andrey Zagrebin
>Priority: Major
>
> While working on FLINK-10455, I encountered a deadlock in 
> _FlinkKafkaProducer(011).close()_ if _pendingTransactions_ are closed before 
> _currentTransaction_. There is no deadlock other way around.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot edited a comment on issue #9073: [FLINK-13187] Introduce ScheduleMode#LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9073: [FLINK-13187] Introduce 
ScheduleMode#LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST
URL: https://github.com/apache/flink/pull/9073#issuecomment-510405770
 
 
   ## CI report:
   
   * f588483906ba4c4459bf18671f84924a3286 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118731822)
   * 95b05f8b9ccc099f35c8c6001c0e82468f459e2a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118811562)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9083: [FLINK-13116] [table-planner-blink] Supports catalog statistics in blink planner

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9083: [FLINK-13116] [table-planner-blink] 
Supports catalog statistics in blink planner
URL: https://github.com/apache/flink/pull/9083#issuecomment-510435070
 
 
   ## CI report:
   
   * 402974d0ffb69d9244c108234c9837f2eacc8d37 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118878495)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on issue #7487: [FLINK-11321] Clarify NPE on fetching nonexistent topic

2019-07-11 Thread GitBox
becketqin commented on issue #7487: [FLINK-11321] Clarify NPE on fetching 
nonexistent topic
URL: https://github.com/apache/flink/pull/7487#issuecomment-510736925
 
 
   @Fokko Any update on this? We should also fix the `KafkaPartitionDiscoverer` 
in the universal Kafka connector as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13206) modify 'use database' syntax in SQL CLI to be consistant with standard sql

2019-07-11 Thread zjuwangg (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjuwangg updated FLINK-13206:
-
Affects Version/s: 1.9.0

> modify 'use database' syntax in SQL CLI to be consistant with standard sql
> --
>
> Key: FLINK-13206
> URL: https://issues.apache.org/jira/browse/FLINK-13206
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.9.0
>Reporter: zjuwangg
>Assignee: zjuwangg
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (FLINK-13206) modify 'use database' syntax in SQL CLI to be consistant with standard sql

2019-07-11 Thread zjuwangg (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zjuwangg updated FLINK-13206:
-
Fix Version/s: 1.9.0

> modify 'use database' syntax in SQL CLI to be consistant with standard sql
> --
>
> Key: FLINK-13206
> URL: https://issues.apache.org/jira/browse/FLINK-13206
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.9.0
>Reporter: zjuwangg
>Assignee: zjuwangg
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] beyond1920 commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
beyond1920 commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r302807906
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/PartitionPruner.scala
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, 
RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.DEFAULT_COLLECTOR_TERM
+import org.apache.flink.table.codegen.{ConstantCodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.{BinaryString, Decimal, GenericRow}
+import org.apache.flink.table.types.logical.LogicalTypeRoot._
+import org.apache.flink.table.types.logical.{BooleanType, DecimalType, 
LogicalType}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+
+import org.apache.calcite.rex.RexNode
+
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Utility class for partition pruning.
+  *
+  * Creates partition filter instance (a [[RichMapFunction]]) with partition 
predicates by code-gen,
+  * and then evaluates all partition values against the partition filter to 
get final partitions.
+  */
+object PartitionPruner {
+
+  // current supports partition field type
+  val supportedPartitionFieldTypes = Array(
+VARCHAR, CHAR, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+  )
+
+  /**
+* get pruned partitions from all partitions by partition filters
+*
+* @param partitionFieldNames Partition field names.
+* @param partitionFieldTypes Partition field types.
+* @param allPartitions   All partition values.
+* @param partitionPredicate  A predicate that will be applied against 
partition values.
+* @return Pruned partitions.
+*/
+  def prunePartitions(
+  config: TableConfig,
+  partitionFieldNames: Array[String],
+  partitionFieldTypes: Array[LogicalType],
+  allPartitions: JList[JMap[String, String]],
+  partitionPredicate: RexNode): JList[JMap[String, String]] = {
+
+if (allPartitions.isEmpty || partitionPredicate.isAlwaysTrue) {
+  return allPartitions
+}
+
+val inputType = new BaseRowTypeInfo(partitionFieldTypes, 
partitionFieldNames).toRowType
+val returnType: LogicalType = new BooleanType(false)
+
+val ctx = new ConstantCodeGeneratorContext(config)
+val collectorTerm = DEFAULT_COLLECTOR_TERM
+
+val exprGenerator = new ExprCodeGenerator(ctx, false)
+  .bindInput(inputType)
+
+val filterExpression = exprGenerator.generateExpression(partitionPredicate)
+
+val filterFunctionBody =
+  s"""
+ |${filterExpression.code}
+ |return ${filterExpression.resultTerm};
+ |""".stripMargin
+
+val genFunction = FunctionCodeGenerator.generateFunction(
+  ctx,
+  "PartitionPruner",
+  classOf[MapFunction[GenericRow, Boolean]],
+  filterFunctionBody,
+  returnType,
+  inputType,
+  collectorTerm = collectorTerm)
+
+val function = genFunction.newInstance(getClass.getClassLoader)
+val richMapFunction = function match {
+  case r: RichMapFunction[GenericRow, Boolean] => r
+  case _ => throw new TableException("RichMapFunction[GenericRow, Boolean] 
required here")
+}
+
+val results: JList[Boolean] = new JArrayList[Boolean](allPartitions.size)
+val collector = new ListCollector[Boolean](results)
+
+val parameters = if (config.getConfiguration != null) {
+  config.getConfiguration
+} else {
+  new Configuration()
+}
 
 Review comment:
   "According to the TableConfig a config can never be null. Introducing 
arbitrary null checks is not helpful."
   which I borrow Timo's comment on 

[GitHub] [flink] beyond1920 commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
beyond1920 commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r302806301
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, PartitionPruner, 
RexNodeExtractor}
+import org.apache.flink.table.sources.PartitionableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that tries to push partitions evaluated by filter condition 
into a
+  * [[PartitionableTableSource]].
+  */
+class PushPartitionIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[Filter],
+operand(classOf[LogicalTableScan], none)),
+  "PushPartitionIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val filter: Filter = call.rel(0)
+if (filter.getCondition == null) {
+  return false
+}
+
+val scan: LogicalTableScan = call.rel(1)
+scan.getTable.unwrap(classOf[TableSourceTable[_]]) match {
+  case table: TableSourceTable[_] =>
+table.tableSource match {
+  case p: PartitionableTableSource => p.getPartitionFieldNames.nonEmpty
+  case _ => false
+}
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val filter: Filter = call.rel(0)
+val scan: LogicalTableScan = call.rel(1)
+val table: FlinkRelOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+pushPartitionIntoScan(call, filter, scan, table)
+  }
+
+  private def pushPartitionIntoScan(
+  call: RelOptRuleCall,
+  filter: Filter,
+  scan: LogicalTableScan,
+  relOptTable: FlinkRelOptTable): Unit = {
+
+val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
+val tableSource = 
tableSourceTable.tableSource.asInstanceOf[PartitionableTableSource]
+val partitionFieldNames = tableSource.getPartitionFieldNames.toList.toArray
+val inputFieldType = filter.getInput.getRowType
+
+val relBuilder = call.builder()
+val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan)
+val (partitionPredicate, nonPartitionPredicate) =
+  RexNodeExtractor.extractPartitionPredicates(
+filter.getCondition,
+maxCnfNodeCount,
+inputFieldType.getFieldNames.toList.toArray,
+relBuilder.getRexBuilder,
+partitionFieldNames
+  )
+
+if (partitionPredicate.isAlwaysTrue) {
+  // no partition predicates in filter
+  return
+}
+
+val finalPartitionPredicate = adjustPartitionPredicate(
+  inputFieldType.getFieldNames.toList.toArray,
 
 Review comment:
   toList.toArray => toArray?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
beyond1920 commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r302807941
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/PartitionPruner.scala
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, 
RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.DEFAULT_COLLECTOR_TERM
+import org.apache.flink.table.codegen.{ConstantCodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.{BinaryString, Decimal, GenericRow}
+import org.apache.flink.table.types.logical.LogicalTypeRoot._
+import org.apache.flink.table.types.logical.{BooleanType, DecimalType, 
LogicalType}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+
+import org.apache.calcite.rex.RexNode
+
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Utility class for partition pruning.
+  *
+  * Creates partition filter instance (a [[RichMapFunction]]) with partition 
predicates by code-gen,
+  * and then evaluates all partition values against the partition filter to 
get final partitions.
+  */
+object PartitionPruner {
+
+  // current supports partition field type
+  val supportedPartitionFieldTypes = Array(
+VARCHAR, CHAR, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, 
DECIMAL
+  )
+
+  /**
+* get pruned partitions from all partitions by partition filters
+*
+* @param partitionFieldNames Partition field names.
+* @param partitionFieldTypes Partition field types.
+* @param allPartitions   All partition values.
+* @param partitionPredicate  A predicate that will be applied against 
partition values.
+* @return Pruned partitions.
+*/
+  def prunePartitions(
+  config: TableConfig,
+  partitionFieldNames: Array[String],
+  partitionFieldTypes: Array[LogicalType],
+  allPartitions: JList[JMap[String, String]],
+  partitionPredicate: RexNode): JList[JMap[String, String]] = {
+
+if (allPartitions.isEmpty || partitionPredicate.isAlwaysTrue) {
+  return allPartitions
+}
+
+val inputType = new BaseRowTypeInfo(partitionFieldTypes, 
partitionFieldNames).toRowType
+val returnType: LogicalType = new BooleanType(false)
+
+val ctx = new ConstantCodeGeneratorContext(config)
+val collectorTerm = DEFAULT_COLLECTOR_TERM
+
+val exprGenerator = new ExprCodeGenerator(ctx, false)
+  .bindInput(inputType)
+
+val filterExpression = exprGenerator.generateExpression(partitionPredicate)
+
+val filterFunctionBody =
+  s"""
+ |${filterExpression.code}
+ |return ${filterExpression.resultTerm};
+ |""".stripMargin
+
+val genFunction = FunctionCodeGenerator.generateFunction(
+  ctx,
+  "PartitionPruner",
+  classOf[MapFunction[GenericRow, Boolean]],
+  filterFunctionBody,
+  returnType,
+  inputType,
+  collectorTerm = collectorTerm)
+
+val function = genFunction.newInstance(getClass.getClassLoader)
+val richMapFunction = function match {
+  case r: RichMapFunction[GenericRow, Boolean] => r
+  case _ => throw new TableException("RichMapFunction[GenericRow, Boolean] 
required here")
+}
+
+val results: JList[Boolean] = new JArrayList[Boolean](allPartitions.size)
+val collector = new ListCollector[Boolean](results)
+
+val parameters = if (config.getConfiguration != null) {
 
 Review comment:
   parameters => configuration 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above 

[GitHub] [flink] JingsongLi commented on issue #9095: [FLINK-13232][table-planner-blink] Remove CURRENT_DATE test to avoid timezone error

2019-07-11 Thread GitBox
JingsongLi commented on issue #9095: [FLINK-13232][table-planner-blink] Remove 
CURRENT_DATE test to avoid timezone error
URL: https://github.com/apache/flink/pull/9095#issuecomment-510736379
 
 
   travis in: https://travis-ci.org/JingsongLi/flink/builds/557625626


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9083: [FLINK-13116] [table-planner-blink] Supports catalog statistics in blink planner

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9083: [FLINK-13116] [table-planner-blink] 
Supports catalog statistics in blink planner
URL: https://github.com/apache/flink/pull/9083#issuecomment-510435070
 
 
   ## CI report:
   
   * 402974d0ffb69d9244c108234c9837f2eacc8d37 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118819918)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9094: [FLINK-13094][state-processor-api] Provide an easy way to read timers using the State Processor API

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9094: [FLINK-13094][state-processor-api] 
Provide an easy way to read timers using the State Processor API
URL: https://github.com/apache/flink/pull/9094#issuecomment-510560219
 
 
   ## CI report:
   
   * 68c63247fc6c3b60b0cf7afb483cc04ee8f558d0 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118877885)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9062: [FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9062: [FLINK-13100][network] Fix the bug of 
throwing IOException while FileChannelBoundedData#nextBuffer
URL: https://github.com/apache/flink/pull/9062#issuecomment-510405737
 
 
   ## CI report:
   
   * 106228aa31ad7065acf116029879a00fd998662b : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118725251)
   * 2b189a8483bbd019ffee22d9746d052855ef6142 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118877929)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #9091: [FLINK-13229][table-planner-blink] ExpressionReducer with udf bug in blink

2019-07-11 Thread GitBox
JingsongLi commented on issue #9091: [FLINK-13229][table-planner-blink] 
ExpressionReducer with udf bug in blink
URL: https://github.com/apache/flink/pull/9091#issuecomment-510735657
 
 
   Test pass in: https://travis-ci.org/JingsongLi/flink/builds/557348029


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883503#comment-16883503
 ] 

Jiangjie Qin commented on FLINK-11654:
--

[~pnowojski] [~aljoscha] [~dawidwys] [~vicTTim] Do you think using JobName is a 
viable solution here?

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot edited a comment on issue #9078: [FLINK-13186] Remove dispatcherRetrievalService and dispatcherLeaderRetriever from RestClusterClient

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9078: [FLINK-13186] Remove 
dispatcherRetrievalService and dispatcherLeaderRetriever from RestClusterClient
URL: https://github.com/apache/flink/pull/9078#issuecomment-510405833
 
 
   ## CI report:
   
   * b73e7985607cee8e9a8abc5b8031e4ee149db514 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118877390)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9094: [FLINK-13094][state-processor-api] Provide an easy way to read timers using the State Processor API

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9094: [FLINK-13094][state-processor-api] 
Provide an easy way to read timers using the State Processor API
URL: https://github.com/apache/flink/pull/9094#issuecomment-510560219
 
 
   ## CI report:
   
   * 68c63247fc6c3b60b0cf7afb483cc04ee8f558d0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118811503)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9062: [FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9062: [FLINK-13100][network] Fix the bug of 
throwing IOException while FileChannelBoundedData#nextBuffer
URL: https://github.com/apache/flink/pull/9062#issuecomment-510405737
 
 
   ## CI report:
   
   * 106228aa31ad7065acf116029879a00fd998662b : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118725251)
   * 2b189a8483bbd019ffee22d9746d052855ef6142 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118818573)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11736) flink kafka producer failed with NOT_LEADER_FOR_PARTITION

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883502#comment-16883502
 ] 

Jiangjie Qin commented on FLINK-11736:
--

[~debugman] NOT_LEADER_FOR_PARTITION indicates that there was a leader 
migration in the Kafka cluster for some partition. Such information of the new 
leader will be propagated to all the brokers in the cluster asynchronously. It 
may take some time, typically within 1 second, for all the brokers in the Kafka 
cluster to receive the updated leader information.

>From the timestamp in the log, it seems the producer was retrying with a 
>backoff around 3 ms. This may exhaust the 10 retries in 30 ms, which may be 
>too aggressive. The default value is actually 100ms. Did you happened to 
>change that?

> flink kafka producer failed with NOT_LEADER_FOR_PARTITION
> -
>
> Key: FLINK-11736
> URL: https://issues.apache.org/jira/browse/FLINK-11736
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Kaicheng Sun
>Priority: Major
>
> my flink program connect kafka as its sinker using "FlinkKafkaProducer011" 
> library.  But sometimes the flink will exit abnormally with this kind of 
> error:
> (2019-02-23 11:55:11,656 WARN  
> org.apache.kafka.clients.producer.internals.Sender- Got error 
> produce response with correlation id 55711 on topic-partition 
> tmp_sink_redis-17, retrying (8 attempts left). Error: NOT_LEADER_FOR_PARTITION
> 2019-02-23 11:55:11,658 WARN  
> org.apache.kafka.clients.producer.internals.Sender- Got error 
> produce response with correlation id 55712 on topic-partition 
> tmp_sink_redis-17, retrying (8 attempts left). Error: 
> NOT_LEADER_FOR_PARTITION)
>  
> The kafka cluster works properly, so I have no idea why this error will happen



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-11736) flink kafka producer failed with NOT_LEADER_FOR_PARTITION

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883502#comment-16883502
 ] 

Jiangjie Qin edited comment on FLINK-11736 at 7/12/19 3:50 AM:
---

[~debugman] NOT_LEADER_FOR_PARTITION indicates that there was a leader 
migration in the Kafka cluster for some partition. Such information of the new 
leader will be propagated to all the brokers in the cluster asynchronously. It 
may take some time, typically within 1 second, for all the brokers in the Kafka 
cluster to receive the updated leader information.

>From the timestamp in the log, it seems the producer was retrying with a 
>backoff around 3 ms. This may exhaust the 10 retries in 30 ms, which may be 
>too aggressive. The default value of \{{retry.backoff.ms}} is actually 100ms. 
>Did you happened to change that?


was (Author: becket_qin):
[~debugman] NOT_LEADER_FOR_PARTITION indicates that there was a leader 
migration in the Kafka cluster for some partition. Such information of the new 
leader will be propagated to all the brokers in the cluster asynchronously. It 
may take some time, typically within 1 second, for all the brokers in the Kafka 
cluster to receive the updated leader information.

>From the timestamp in the log, it seems the producer was retrying with a 
>backoff around 3 ms. This may exhaust the 10 retries in 30 ms, which may be 
>too aggressive. The default value is actually 100ms. Did you happened to 
>change that?

> flink kafka producer failed with NOT_LEADER_FOR_PARTITION
> -
>
> Key: FLINK-11736
> URL: https://issues.apache.org/jira/browse/FLINK-11736
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Kaicheng Sun
>Priority: Major
>
> my flink program connect kafka as its sinker using "FlinkKafkaProducer011" 
> library.  But sometimes the flink will exit abnormally with this kind of 
> error:
> (2019-02-23 11:55:11,656 WARN  
> org.apache.kafka.clients.producer.internals.Sender- Got error 
> produce response with correlation id 55711 on topic-partition 
> tmp_sink_redis-17, retrying (8 attempts left). Error: NOT_LEADER_FOR_PARTITION
> 2019-02-23 11:55:11,658 WARN  
> org.apache.kafka.clients.producer.internals.Sender- Got error 
> produce response with correlation id 55712 on topic-partition 
> tmp_sink_redis-17, retrying (8 attempts left). Error: 
> NOT_LEADER_FOR_PARTITION)
>  
> The kafka cluster works properly, so I have no idea why this error will happen



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13235) Change the Netty default transport mode to auto

2019-07-11 Thread zhijiang (JIRA)
zhijiang created FLINK-13235:


 Summary: Change the Netty default transport mode to auto
 Key: FLINK-13235
 URL: https://issues.apache.org/jira/browse/FLINK-13235
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current default config for "taskmanager.net.transport" in 
NettyShuffleEnvironmentOptions is "NIO". In order to use "EPOLL" mode which has 
better performance and is recommended when available, we could change the 
default config as "AUTO". Then the "NIO" mode is used as a fallback.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] godfreyhe commented on a change in pull request #9035: [FLINK-13168] [table] clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9035: [FLINK-13168] [table] 
clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
URL: https://github.com/apache/flink/pull/9035#discussion_r302818070
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
 ##
 @@ -31,6 +31,8 @@
 /**
  * Describes a relational operation that reads from a {@link DataStream}.
  *
+ * This is only used for testing.
 
 Review comment:
   this `DataStreamQueryOperation` is only used in `TableTestBase` for adding 
test DataStream with statistic. and it will not used for user interface, such 
as `StreamTableEnvironmentImpl#fromDataStream`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #9035: [FLINK-13168] [table] clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9035: [FLINK-13168] [table] 
clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
URL: https://github.com/apache/flink/pull/9035#discussion_r302817944
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
 ##
 @@ -30,17 +32,20 @@
  * A {@link TableSourceQueryOperation} with {@link FlinkStatistic} and 
qualifiedName.
  * TODO this class should be deleted after unique key in TableSchema is ready
  * and setting catalog statistic to TableSourceTable in DatabaseCalciteSchema 
is ready
+ *
+ * This is only used for testing.
 
 Review comment:
   `RichTableSourceQueryOperation` is only used in `TableTestBase` for adding 
test TableSource with statistic. and it will not used for user interface, such 
as `TableEnvironmentImpl#fromTableSource`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #9035: [FLINK-13168] [table] clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9035: [FLINK-13168] [table] 
clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
URL: https://github.com/apache/flink/pull/9035#discussion_r302816538
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 ##
 @@ -810,13 +811,10 @@ case class JavaBatchTableTestUtil(test: TableTestBase) 
extends JavaTableTestUtil
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
   */
-class TestTableSource(isBatch: Boolean, schema: TableSchema)
+class TestTableSource(val isBounded: Boolean, schema: TableSchema)
 
 Review comment:
   bounded `TestTableSource` will also used in streaming test, so i think 
isBounded is more suitable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13153) SplitAggregateITCase.testMinMaxWithRetraction failed on Travis

2019-07-11 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883500#comment-16883500
 ] 

Jark Wu commented on FLINK-13153:
-

I will look into this soon.

> SplitAggregateITCase.testMinMaxWithRetraction failed on Travis
> --
>
> Key: FLINK-13153
> URL: https://issues.apache.org/jira/browse/FLINK-13153
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Jark Wu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> {{SplitAggregateITCase.testMinMaxWithRetraction}} failed on Travis with
> {code}
> Failures: 
> 10:50:43.355 [ERROR]   SplitAggregateITCase.testMinMaxWithRetraction:195 
> expected: but was: 6,2,2,1)>
> {code}
> https://api.travis-ci.org/v3/job/554991853/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-13153) SplitAggregateITCase.testMinMaxWithRetraction failed on Travis

2019-07-11 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-13153:
---

Assignee: Jark Wu

> SplitAggregateITCase.testMinMaxWithRetraction failed on Travis
> --
>
> Key: FLINK-13153
> URL: https://issues.apache.org/jira/browse/FLINK-13153
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Jark Wu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> {{SplitAggregateITCase.testMinMaxWithRetraction}} failed on Travis with
> {code}
> Failures: 
> 10:50:43.355 [ERROR]   SplitAggregateITCase.testMinMaxWithRetraction:195 
> expected: but was: 6,2,2,1)>
> {code}
> https://api.travis-ci.org/v3/job/554991853/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot edited a comment on issue #9078: [FLINK-13186] Remove dispatcherRetrievalService and dispatcherLeaderRetriever from RestClusterClient

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9078: [FLINK-13186] Remove 
dispatcherRetrievalService and dispatcherLeaderRetriever from RestClusterClient
URL: https://github.com/apache/flink/pull/9078#issuecomment-510405833
 
 
   ## CI report:
   
   * b73e7985607cee8e9a8abc5b8031e4ee149db514 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118799177)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #8556: [FLINK-12171][Network] Do not limit 
the network buffer memory by heap size on the TM side
URL: https://github.com/apache/flink/pull/8556#issuecomment-510513877
 
 
   ## CI report:
   
   * 4e2a56f6a3a9e57b001ee013d31053222e05ab7a : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118876293)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8742: [FLINK-11879] Add validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput

2019-07-11 Thread GitBox
flinkbot commented on issue #8742: [FLINK-11879] Add validators for the uses of 
InputSelectable, BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8742#issuecomment-510731561
 
 
   ## CI report:
   
   * 3f0c15862fc70f35cd58883ca9635bde1a5fb7ee : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118876288)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9097: [flink-11529][docs-zh]Translate the "DataStream API Tutorial" page into Chinese

2019-07-11 Thread GitBox
flinkbot commented on issue #9097: [flink-11529][docs-zh]Translate the 
"DataStream API Tutorial" page into Chinese
URL: https://github.com/apache/flink/pull/9097#issuecomment-510731538
 
 
   ## CI report:
   
   * 9fb0b9288a0b45fe24445e6bd18b7e99c7bd88eb : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118876215)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on issue #8742: [FLINK-11879] Add validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput

2019-07-11 Thread GitBox
sunhaibotb commented on issue #8742: [FLINK-11879] Add validators for the uses 
of InputSelectable, BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8742#issuecomment-510731040
 
 
   Thanks for reviewing @pnowojski  @1u0 . 
   The code has been updated.  Because of the conflict with the latest master 
branch, I rebased on the latest master and forcibly pushed. You can just look 
at `StreamingJobGraphGenerator` and `StreamGraph`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese

2019-07-11 Thread LakeShen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883495#comment-16883495
 ] 

LakeShen commented on FLINK-11607:
--

Hi [~jark], I have translated this page, if you have free time ,please review 
this pull request,thanks.
Url is : [https://github.com/apache/flink/pull/9097]

> Translate the "DataStream API Tutorial" page into Chinese
> -
>
> Key: FLINK-11607
> URL: https://issues.apache.org/jira/browse/FLINK-11607
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: LakeShen
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html
> The markdown file is located in flink/docs/tutorials/datastream_api.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot commented on issue #9097: [flink-11529][docs-zh]Translate the "DataStream API Tutorial" page into Chinese

2019-07-11 Thread GitBox
flinkbot commented on issue #9097: [flink-11529][docs-zh]Translate the 
"DataStream API Tutorial" page into Chinese
URL: https://github.com/apache/flink/pull/9097#issuecomment-510729249
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] LakeShen opened a new pull request #9097: [flink-11529][docs-zh]Translate the "DataStream API Tutorial" page into Chinese

2019-07-11 Thread GitBox
LakeShen opened a new pull request #9097: [flink-11529][docs-zh]Translate the 
"DataStream API Tutorial" page into Chinese
URL: https://github.com/apache/flink/pull/9097
 
 
   [flink-11529][docs-zh]Translate the "DataStream API Tutorial" page into 
Chinese
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13224) TupleTypeInfoBase equals function can't distinguish different struct type

2019-07-11 Thread forideal (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883492#comment-16883492
 ] 

forideal commented on FLINK-13224:
--

[~twalthr] thank you for your reply!

Yes ,this is a problem of the RowTypeInfo.

I use this type to describe struct , the following code is my example

 
{code:java}
Schema appSchema = new Schema()
.field("timestamp", Types.SQL_TIMESTAMP)
.field("user", Types.ROW_NAMED(
new String[]{"user_id", "user_name"},
Types.INT, Types.STRING
))
.field("device",Types.ROW_NAMED(
new String[]{"device_id", "device_type"},
Types.INT, Types.STRING
)));
{code}
if user type equals device type ,my flink sql app will get uncorrect typeinfo.

 

> TupleTypeInfoBase equals function can't distinguish different struct type
> -
>
> Key: FLINK-13224
> URL: https://issues.apache.org/jira/browse/FLINK-13224
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
> Environment: flink 1.8.0
>Reporter: forideal
>Priority: Major
>
> Hi
> i have two struct type
> one is
> {code:java}
> // code placeholder
> Types.ROW_NAMED(
>          new String[]{"device"},
>          Types.PRIMITIVE_ARRAY(Types.BYTE)
> )
> {code}
> the other is
>  
> {code:java}
> // code placeholder
> Types.ROW_NAMED(
>          new String[]{"app"},
>          Types.PRIMITIVE_ARRAY(Types.BYTE)
> )
> {code}
> when i compare those two types ,the equals function returns true.
> there are some code in TupleTypeInfoBase         
> {code:java}
> // code placeholder
> return other.canEqual(this) &&
>            super.equals(other) &&
>Arrays.equals(types, other.types) &&
>totalFields == other.totalFields;
> {code}
> i think,The equals function should compare field names.
> eg:
> {code:java}
> // code placeholder
> if (totalFields == other.totalFields) {
>  String[] otherFieldNames = other.getFieldNames();
>  String[] fieldNames = this.getFieldNames();
>  for (int i = 0; i < totalFields; i++) {
>  if (!otherFieldNames[i].equals(fieldNames[i])) {
>  return false;
>  }
>  }
>  } else{
>return false;
>  }
> return other.canEqual(this) &&
>super.equals(other) &&
>Arrays.equals(types, other.types);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] flinkbot edited a comment on issue #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #8556: [FLINK-12171][Network] Do not limit 
the network buffer memory by heap size on the TM side
URL: https://github.com/apache/flink/pull/8556#issuecomment-510513877
 
 
   ## CI report:
   
   * 4e2a56f6a3a9e57b001ee013d31053222e05ab7a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118791465)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13150) defaultCatalogName and defaultDatabaseName in TableEnvImpl is not updated after they are updated in TableEnvironment

2019-07-11 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883489#comment-16883489
 ] 

Xuefu Zhang commented on FLINK-13150:
-

Hi [~zjffdu], tableEnv.registerDataSet() is meant to register table always in 
the built-in, in-memory catalog. To register a catalog table, you need to use 
tableEnv.sqlUpdate(). Let me know if you have more questions. Thanks.

> defaultCatalogName and defaultDatabaseName in TableEnvImpl is not updated 
> after they are updated in TableEnvironment
> 
>
> Key: FLINK-13150
> URL: https://issues.apache.org/jira/browse/FLINK-13150
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Jeff Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> defaultCatalogName and defaultDatabaseName in TableEnvImpl are initialized 
> when it is created and never changed even when they are updated in 
> TableEnvironment.
> The will cause issues that we may register table to the wrong catalog after 
> we changed the defaultCatalogName and defaultDatabaseName 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] KurtYoung commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302812604
 
 

 ##
 File path: flink-table/flink-table-planner-blink/pom.xml
 ##
 @@ -324,6 +336,7 @@ under the License.

commons-codec:commons-codec
 

+   
org.apache.flink.sql.parser:*
 
 Review comment:
   This is not a flink-table-runtime-blink dependency


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #9039: [FLINK-13170][table-planner] Planner should get table factory from ca…

2019-07-11 Thread GitBox
lirui-apache commented on issue #9039: [FLINK-13170][table-planner] Planner 
should get table factory from ca…
URL: https://github.com/apache/flink/pull/9039#issuecomment-510725336
 
 
   Thanks @danny0405 , I've updated accordingly.
   @godfreyhe @bowenli86 please take another look, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9096: [hotfix][table-planner-blink] Also set batch properties in BatchExecutor

2019-07-11 Thread GitBox
flinkbot commented on issue #9096: [hotfix][table-planner-blink] Also set batch 
properties in BatchExecutor
URL: https://github.com/apache/flink/pull/9096#issuecomment-510724774
 
 
   ## CI report:
   
   * d75ca929b19b054d56332b13374bb3570ccdd179 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118873953)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13132) Allow ClusterEntrypoints use user main method to generate job graph

2019-07-11 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883481#comment-16883481
 ] 

Guowei Ma commented on FLINK-13132:
---

[~suez1224] and [~ZhenqiuHuang]. Thanks for your detailed explanations. Your 
scenario makes sense to me.  
1. Except for the jars, how do you deal with the dependency of the user's jobs 
when you move from a cluster to another? For example, the checkpoint data of 
jobs and the source and sink of user's jobs. Will your deploy service also move 
those dependencies for the users?

2. In the HA scenario, we could use the ability of the SubmittedGraphStore to 
resolve the non-deterministic problem. Before generating the new JobGraph, 
Dispatcher could check the "SubmittedGraphStore" first then to decide whether 
to generate the JobGraph or not. I think [~till.rohrmann] could give more 
suggestions.
Thanks.

 

> Allow ClusterEntrypoints use user main method to generate job graph
> ---
>
> Key: FLINK-13132
> URL: https://issues.apache.org/jira/browse/FLINK-13132
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0, 1.8.1
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>
> We are building a service that can transparently deploy a job to different 
> cluster management systems, such as Yarn and another internal system. It is 
> very cost to download the jar and generate JobGraph in the client side. Thus, 
> I want to propose an improvement to make Yarn Entrypoints can be configurable 
> to use either FileJobGraphRetriever or ClassPathJobGraphRetriever. It is 
> actually a long asking TODO in AbstractionYarnClusterDescriptor in line 834.
> https://github.com/apache/flink/blob/21468e0050dc5f97de5cfe39885e0d3fd648e399/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L834



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] danny0405 commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
danny0405 commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302810245
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -457,6 +458,23 @@ abstract class TableEnvImpl(
 registerCatalogTableInternal(operation.getTablePath,
   operation.getCatalogTable,
   operation.isIgnoreIfExists)
+  case dropTable: SqlDropTable =>
+val name = dropTable.fullTableName()
+val isIfExists = dropTable.getIfExists
+val paths = catalogManager.getFullTablePath(name.toList)
+val catalog = getCatalog(paths(0))
+if (!catalog.isPresent) {
+  if (!isIfExists) {
+throw new TableException(s"Catalog ${paths(0)} does not exist.")
+  }
+} else {
+  try
+catalog.get().dropTable(new ObjectPath(paths(1), paths(2)), 
isIfExists)
+  catch {
+case e: TableNotExistException =>
+  throw new TableException(e.getMessage)
+  }
+}
   case _ =>
 throw new TableException(
   "Unsupported SQL query! sqlUpdate() only accepts SQL statements of 
type INSERT.")
 
 Review comment:
   Updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
danny0405 commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302810065
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
 ##
 @@ -112,24 +114,26 @@ abstract class PlannerBase(
 val planner = getFlinkPlanner
 // parse the sql query
 val parsed = planner.parse(stmt)
-if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
-  // validate the sql query
-  val validated = planner.validate(parsed)
-  // transform to a relational tree
-  val relational = planner.rel(validated)
-  List(new PlannerQueryOperation(relational.project()))
-} else if (null != parsed && parsed.isInstanceOf[SqlInsert]) {
-  val insert = parsed.asInstanceOf[SqlInsert]
-  // validate the SQL query
-  val query = insert.getSource
-  val validatedQuery = planner.validate(query)
-  val sinkOperation = new CatalogSinkModifyOperation(
-insert.getTargetTable.asInstanceOf[SqlIdentifier].names,
-new PlannerQueryOperation(planner.rel(validatedQuery).rel)
-  )
-  List(sinkOperation)
-} else {
-  throw new TableException(s"Unsupported query: $stmt")
+parsed match {
+  case insert: SqlInsert =>
+// get name of sink table
+val targetTablePath = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+List(new CatalogSinkModifyOperation(targetTablePath,
+  SqlToOperationConverter.convert(planner,
+insert.getSource).asInstanceOf[PlannerQueryOperation])
+  .asInstanceOf[Operation]).asJava
 
 Review comment:
   Yes, cause we already have `JavaConversions._`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r30281
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, PartitionPruner, 
RexNodeExtractor}
+import org.apache.flink.table.sources.PartitionableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that tries to push partitions evaluated by filter condition 
into a
+  * [[PartitionableTableSource]].
+  */
+class PushPartitionIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[Filter],
+operand(classOf[LogicalTableScan], none)),
+  "PushPartitionIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val filter: Filter = call.rel(0)
+if (filter.getCondition == null) {
+  return false
+}
+
+val scan: LogicalTableScan = call.rel(1)
+scan.getTable.unwrap(classOf[TableSourceTable[_]]) match {
+  case table: TableSourceTable[_] =>
+table.tableSource match {
+  case p: PartitionableTableSource => p.getPartitionFieldNames.nonEmpty
+  case _ => false
+}
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val filter: Filter = call.rel(0)
+val scan: LogicalTableScan = call.rel(1)
+val table: FlinkRelOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+pushPartitionIntoScan(call, filter, scan, table)
+  }
+
+  private def pushPartitionIntoScan(
+  call: RelOptRuleCall,
+  filter: Filter,
+  scan: LogicalTableScan,
+  relOptTable: FlinkRelOptTable): Unit = {
+
+val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
+val tableSource = 
tableSourceTable.tableSource.asInstanceOf[PartitionableTableSource]
+val partitionFieldNames = tableSource.getPartitionFieldNames.toList.toArray
+val inputFieldType = filter.getInput.getRowType
+
+val relBuilder = call.builder()
+val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan)
+val (partitionPredicate, nonPartitionPredicate) =
+  RexNodeExtractor.extractPartitionPredicates(
+filter.getCondition,
+maxCnfNodeCount,
+inputFieldType.getFieldNames.toList.toArray,
+relBuilder.getRexBuilder,
+partitionFieldNames
+  )
+
+if (partitionPredicate.isAlwaysTrue) {
+  // no partition predicates in filter
+  return
+}
+
+val finalPartitionPredicate = adjustPartitionPredicate(
+  inputFieldType.getFieldNames.toList.toArray,
+  partitionFieldNames,
+  partitionPredicate
+)
+val partitionFieldTypes = partitionFieldNames.map { name =>
+  val index = inputFieldType.getFieldNames.indexOf(name)
+  require(index >= 0, s"$name is not found in 
${inputFieldType.getFieldNames.mkString(", ")}")
+  inputFieldType.getFieldList.get(index).getType
+}.map(FlinkTypeFactory.toLogicalType)
+
+val allPartitions = tableSource.getPartitions
+val remainingPartitions = PartitionPruner.prunePartitions(
+  call.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig,
+  partitionFieldNames,
+  partitionFieldTypes,
+  allPartitions,
+  finalPartitionPredicate
+)
+
+val newTableSource = 

[jira] [Comment Edited] (FLINK-11792) Make KafkaConsumer more resilient to Kafka Broker Failures

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883478#comment-16883478
 ] 

Jiangjie Qin edited comment on FLINK-11792 at 7/12/19 2:38 AM:
---

[~knaufk] Thanks for the explanation. There might be some misunderstanding on 
this.

The protocol_partitioning parts are mostly for Kafka client developers. For 
Kafka there are a few [3rd party clients 
libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out 
there. These libraries talk to Kafka brokers according to these the RPC 
protocols defined by Kafka, so they need to know how to handle things like 
leader migration. In our case, we are using the official Kafka java clients 
({{KafkaConsumer}} and {{KafkaProducer}}), so everything described in the 
protocol guide has been handled by the Kafka java clients already.

 
{quote}`KafkaConsumer:assign`, which we use, also states that " As such, there 
will be no rebalance operation triggered when group membership or cluster and 
topic metadata change."
{quote}
 

The "topic metadata change" and _rebalance_ here actually means something 
orthogonal to leader migration. In Kafka the consumers may belong to a consumer 
group, and the consumers in the same consumer group may be consuming from some 
topics together in a coordinated way. The most important part of such 
coordination is to decide which partitions should each consumer in the same 
consumer group to consume from, a.k.a _partition assignment_. The partition 
assignment may change when the members in the consumer group changes (e.g. new 
consumer joins, existing consumer dies, etc) or when "topic metadata changes" 
(e.g. increasing the number of partitions of a topic, creating a new topic that 
matches a pattern). In those case, the partition assignment should be changed 
accordingly. Such partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that 
the consumer who is assigned the partition needs to fetch data from another 
location. But the partition itself is still assigned to that consumer. 
KafkaConsumer internally handles such leader migration by refreshing metadata 
and resending the FetchRequest to the new leader. This process is transparent 
to the users.

 


was (Author: becket_qin):
[~knaufk] Thanks for the explanation. There might be some misunderstanding on 
this.

The protocol_partitioning parts are mostly for Kafka client developers. For 
Kafka there are a few [3rd party clients 
libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out 
there. These libraries talk to Kafka brokers according to these the RPC 
protocols defined by Kafka, so they need to know how to handle things like 
leader migration. In our case, we are using the official Kafka java clients 
({{KafkaConsumer}} and {{KafkaProducer}}), so everything described in the 
protocol guide has been handled by the Kafka java clients already.

 

{quote}

`KafkaConsumer:assign`, which we use, also states that " As such, there will be 
no rebalance operation triggered when group membership or cluster and topic 
metadata change."

{quote}

 

The "topic metadata change" and _rebalance_ here actually means something 
orthogonal. In Kafka the consumers may belong to a consumer group, and the 
consumers in the same consumer group may be consuming from some topics together 
in a coordinated way. The most important part of such coordination is to decide 
which partitions should each consumer in the same consumer group to consume 
from, a.k.a _partition assignment_. The partition assignment may change when 
the members in the consumer group changes (e.g. new consumer joins, existing 
consumer dies, etc) or when "topic metadata changes" (e.g. increasing the 
number of partitions of a topic, creating a new topic that matches a pattern). 
In those case, the partition assignment should be changed accordingly. Such 
partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that 
the consumer who is assigned the partition needs to fetch data from another 
location. But the partition itself is still assigned to that consumer. 
KafkaConsumer internally handles such leader migration by refreshing metadata 
and resending the FetchRequest to the new leader. This process is transparent 
to the users.

 

> Make KafkaConsumer more resilient to Kafka Broker Failures 
> ---
>
> Key: FLINK-11792
> URL: https://issues.apache.org/jira/browse/FLINK-11792
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Konstantin Knauf
>Priority: Major
>
> When consuming from a topic with replication 

[GitHub] [flink] danny0405 commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
danny0405 commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302809445
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
 ##
 @@ -112,24 +114,26 @@ abstract class PlannerBase(
 val planner = getFlinkPlanner
 // parse the sql query
 val parsed = planner.parse(stmt)
-if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
-  // validate the sql query
-  val validated = planner.validate(parsed)
-  // transform to a relational tree
-  val relational = planner.rel(validated)
-  List(new PlannerQueryOperation(relational.project()))
-} else if (null != parsed && parsed.isInstanceOf[SqlInsert]) {
-  val insert = parsed.asInstanceOf[SqlInsert]
-  // validate the SQL query
-  val query = insert.getSource
-  val validatedQuery = planner.validate(query)
-  val sinkOperation = new CatalogSinkModifyOperation(
-insert.getTargetTable.asInstanceOf[SqlIdentifier].names,
-new PlannerQueryOperation(planner.rel(validatedQuery).rel)
-  )
-  List(sinkOperation)
-} else {
-  throw new TableException(s"Unsupported query: $stmt")
+parsed match {
+  case insert: SqlInsert =>
+// get name of sink table
+val targetTablePath = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+List(new CatalogSinkModifyOperation(targetTablePath,
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
danny0405 commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302809209
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
 ##
 @@ -440,6 +440,103 @@ class CatalogTableITCase(isStreaming: Boolean) {
 execJob("testJob")
 assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
+
 
 Review comment:
   Already added


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
danny0405 commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302808998
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DROP TABLE statement.
+ */
+public class DropTableOperation implements DropOperation {
+   private final String[] tableName;
+   private final boolean ifExists;
+
+   public DropTableOperation(String[] tableName, boolean ifExists) {
+   this.tableName = tableName;
+   this.ifExists = ifExists;
+   }
+
+   public String[] getTableName() {
+   return this.tableName;
+   }
+
+   public boolean isIfExists() {
+   return this.ifExists;
+   }
+
+   @Override
+   public String asSummaryString() {
+   Map params = new LinkedHashMap<>();
+   params.put("IfExists", ifExists);
 
 Review comment:
   Added


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9043: [FLINK-13165] Complete slot requests in request order

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9043: [FLINK-13165] Complete slot requests 
in request order
URL: https://github.com/apache/flink/pull/9043#issuecomment-510405816
 
 
   ## CI report:
   
   * a75fa20222c0e9d4bdfbb6eafdfd1831e0ff48e8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118791394)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9071: FLINK-13044 [BuildSystem / Shaded] Fix for wrong shading of AWS SDK in flink-s3-fs-hadoop

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9071: FLINK-13044 [BuildSystem / Shaded] 
Fix for wrong shading of AWS SDK in flink-s3-fs-hadoop
URL: https://github.com/apache/flink/pull/9071#issuecomment-510513895
 
 
   ## CI report:
   
   * 9b3db8abd8b43f98158021a430b00a20fae2b26d : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/118873266)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9096: [hotfix][table-planner-blink] Also set batch properties in BatchExecutor

2019-07-11 Thread GitBox
flinkbot commented on issue #9096: [hotfix][table-planner-blink] Also set batch 
properties in BatchExecutor
URL: https://github.com/apache/flink/pull/9096#issuecomment-510722541
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung edited a comment on issue #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung edited a comment on issue #9093: [FLINK-13220][FLINK-13211] 
[flink-table] Add drop table support for flink planner and add DDL support for 
blink planner
URL: https://github.com/apache/flink/pull/9093#issuecomment-510722253
 
 
   The correct module name are "table-planner" and "table-planner-blink"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong opened a new pull request #9096: [hotfix][table-planner-blink] Also set batch properties in BatchExecutor

2019-07-11 Thread GitBox
XuPingyong opened a new pull request #9096: [hotfix][table-planner-blink] Also 
set batch properties in BatchExecutor
URL: https://github.com/apache/flink/pull/9096
 
 
   
   
   ## What is the purpose of the change
   
   Also set batch properties in BatchExecutor
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no)
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung commented on issue #9093: [FLINK-13220][FLINK-13211] [flink-table] 
Add drop table support for flink planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#issuecomment-510722253
 
 
   The correct module name is "table-planner" and "table-planner-blink"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302807515
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
 ##
 @@ -440,6 +440,103 @@ class CatalogTableITCase(isStreaming: Boolean) {
 execJob("testJob")
 assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
+
 
 Review comment:
   add a test for drop not exist table but with tolerance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302808299
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
 ##
 @@ -112,24 +114,26 @@ abstract class PlannerBase(
 val planner = getFlinkPlanner
 // parse the sql query
 val parsed = planner.parse(stmt)
-if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
-  // validate the sql query
-  val validated = planner.validate(parsed)
-  // transform to a relational tree
-  val relational = planner.rel(validated)
-  List(new PlannerQueryOperation(relational.project()))
-} else if (null != parsed && parsed.isInstanceOf[SqlInsert]) {
-  val insert = parsed.asInstanceOf[SqlInsert]
-  // validate the SQL query
-  val query = insert.getSource
-  val validatedQuery = planner.validate(query)
-  val sinkOperation = new CatalogSinkModifyOperation(
-insert.getTargetTable.asInstanceOf[SqlIdentifier].names,
-new PlannerQueryOperation(planner.rel(validatedQuery).rel)
-  )
-  List(sinkOperation)
-} else {
-  throw new TableException(s"Unsupported query: $stmt")
+parsed match {
+  case insert: SqlInsert =>
+// get name of sink table
+val targetTablePath = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+List(new CatalogSinkModifyOperation(targetTablePath,
+  SqlToOperationConverter.convert(planner,
+insert.getSource).asInstanceOf[PlannerQueryOperation])
+  .asInstanceOf[Operation]).asJava
 
 Review comment:
   asJava is unnecessary?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302806439
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DROP TABLE statement.
+ */
+public class DropTableOperation implements DropOperation {
+   private final String[] tableName;
+   private final boolean ifExists;
+
+   public DropTableOperation(String[] tableName, boolean ifExists) {
+   this.tableName = tableName;
+   this.ifExists = ifExists;
+   }
+
+   public String[] getTableName() {
+   return this.tableName;
+   }
+
+   public boolean isIfExists() {
+   return this.ifExists;
+   }
+
+   @Override
+   public String asSummaryString() {
+   Map params = new LinkedHashMap<>();
+   params.put("IfExists", ifExists);
 
 Review comment:
   add tableName?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9093: [FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink planner and add DDL support for blink planner

2019-07-11 Thread GitBox
KurtYoung commented on a change in pull request #9093: 
[FLINK-13220][FLINK-13211] [flink-table] Add drop table support for flink 
planner and add DDL support for blink planner
URL: https://github.com/apache/flink/pull/9093#discussion_r302807499
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -457,6 +458,23 @@ abstract class TableEnvImpl(
 registerCatalogTableInternal(operation.getTablePath,
   operation.getCatalogTable,
   operation.isIgnoreIfExists)
+  case dropTable: SqlDropTable =>
+val name = dropTable.fullTableName()
+val isIfExists = dropTable.getIfExists
+val paths = catalogManager.getFullTablePath(name.toList)
+val catalog = getCatalog(paths(0))
+if (!catalog.isPresent) {
+  if (!isIfExists) {
+throw new TableException(s"Catalog ${paths(0)} does not exist.")
+  }
+} else {
+  try
+catalog.get().dropTable(new ObjectPath(paths(1), paths(2)), 
isIfExists)
+  catch {
+case e: TableNotExistException =>
+  throw new TableException(e.getMessage)
+  }
+}
   case _ =>
 throw new TableException(
   "Unsupported SQL query! sqlUpdate() only accepts SQL statements of 
type INSERT.")
 
 Review comment:
   update the error message


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11792) Make KafkaConsumer more resilient to Kafka Broker Failures

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883478#comment-16883478
 ] 

Jiangjie Qin edited comment on FLINK-11792 at 7/12/19 2:37 AM:
---

[~knaufk] Thanks for the explanation. There might be some misunderstanding on 
this.

The protocol_partitioning parts are mostly for Kafka client developers. For 
Kafka there are a few [3rd party clients 
libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out 
there. These libraries talk to Kafka brokers according to these the RPC 
protocols defined by Kafka, so they need to know how to handle things like 
leader migration. In our case, we are using the official Kafka java clients 
({{KafkaConsumer}} and {{KafkaProducer}}), so everything described in the 
protocol guide has been handled by the Kafka java clients already.

 

{quote}

`KafkaConsumer:assign`, which we use, also states that " As such, there will be 
no rebalance operation triggered when group membership or cluster and topic 
metadata change."

{quote}

 

The "topic metadata change" and _rebalance_ here actually means something 
orthogonal. In Kafka the consumers may belong to a consumer group, and the 
consumers in the same consumer group may be consuming from some topics together 
in a coordinated way. The most important part of such coordination is to decide 
which partitions should each consumer in the same consumer group to consume 
from, a.k.a _partition assignment_. The partition assignment may change when 
the members in the consumer group changes (e.g. new consumer joins, existing 
consumer dies, etc) or when "topic metadata changes" (e.g. increasing the 
number of partitions of a topic, creating a new topic that matches a pattern). 
In those case, the partition assignment should be changed accordingly. Such 
partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that 
the consumer who is assigned the partition needs to fetch data from another 
location. But the partition itself is still assigned to that consumer. 
KafkaConsumer internally handles such leader migration by refreshing metadata 
and resending the FetchRequest to the new leader. This process is transparent 
to the users.

 


was (Author: becket_qin):
[~knaufk] Thanks for the explanation. There might be some misunderstanding on 
this.

The protocol_partitioning parts are mostly for Kafka client developers. For 
Kafka there are a few [3rd party clients 
libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out 
there. These libraries talk to Kafka brokers according to these the RPC 
protocols defined by Kafka, so they need to know how to handle things like 
leader migration. In our case, we are using the official Kafka java clients 
(\{{KafkaConsumer}} and \{{KafkaProducer}}), so everything described in the 
protocol guide has been handled by the Kafka java clients already.

{quote}`KafkaConsumer:assign`, which we use, also states that " As such, there 
will be no rebalance operation triggered when group membership or cluster and 
topic metadata change."\{quote}

The "topic metadata change" and _rebalance_ here actually means something 
orthogonal. In Kafka the consumers may belong to a consumer group, and the 
consumers in the same consumer group may be consuming from some topics together 
in a coordinated way. The most important part of such coordination is to decide 
which partitions should each consumer in the same consumer group to consume 
from, a.k.a _partition assignment_. The partition assignment may change when 
the members in the consumer group changes (e.g. new consumer joins, existing 
consumer dies, etc) or when "topic metadata changes" (e.g. increasing the 
number of partitions of a topic, creating a new topic that matches a pattern). 
In those case, the partition assignment should be changed accordingly. Such 
partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that 
the consumer who is assigned the partition needs to fetch data from another 
location. But the partition itself is still assigned to that consumer. 
KafkaConsumer internally handles such leader migration by refreshing metadata 
and resending the FetchRequest to the new leader. This process is transparent 
to the users.

 

> Make KafkaConsumer more resilient to Kafka Broker Failures 
> ---
>
> Key: FLINK-11792
> URL: https://issues.apache.org/jira/browse/FLINK-11792
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Konstantin Knauf
>Priority: Major
>
> When consuming from a topic with replication factor > 1, the 
> 

[jira] [Commented] (FLINK-11792) Make KafkaConsumer more resilient to Kafka Broker Failures

2019-07-11 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883478#comment-16883478
 ] 

Jiangjie Qin commented on FLINK-11792:
--

[~knaufk] Thanks for the explanation. There might be some misunderstanding on 
this.

The protocol_partitioning parts are mostly for Kafka client developers. For 
Kafka there are a few [3rd party clients 
libraries|[https://cwiki.apache.org/confluence/display/KAFKA/Clients]] out 
there. These libraries talk to Kafka brokers according to these the RPC 
protocols defined by Kafka, so they need to know how to handle things like 
leader migration. In our case, we are using the official Kafka java clients 
(\{{KafkaConsumer}} and \{{KafkaProducer}}), so everything described in the 
protocol guide has been handled by the Kafka java clients already.

{quote}`KafkaConsumer:assign`, which we use, also states that " As such, there 
will be no rebalance operation triggered when group membership or cluster and 
topic metadata change."\{quote}

The "topic metadata change" and _rebalance_ here actually means something 
orthogonal. In Kafka the consumers may belong to a consumer group, and the 
consumers in the same consumer group may be consuming from some topics together 
in a coordinated way. The most important part of such coordination is to decide 
which partitions should each consumer in the same consumer group to consume 
from, a.k.a _partition assignment_. The partition assignment may change when 
the members in the consumer group changes (e.g. new consumer joins, existing 
consumer dies, etc) or when "topic metadata changes" (e.g. increasing the 
number of partitions of a topic, creating a new topic that matches a pattern). 
In those case, the partition assignment should be changed accordingly. Such 
partition assignment change is called a _rebalance_.

Leader migrations won't trigger any partition reassignment. It only means that 
the consumer who is assigned the partition needs to fetch data from another 
location. But the partition itself is still assigned to that consumer. 
KafkaConsumer internally handles such leader migration by refreshing metadata 
and resending the FetchRequest to the new leader. This process is transparent 
to the users.

 

> Make KafkaConsumer more resilient to Kafka Broker Failures 
> ---
>
> Key: FLINK-11792
> URL: https://issues.apache.org/jira/browse/FLINK-11792
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Konstantin Knauf
>Priority: Major
>
> When consuming from a topic with replication factor > 1, the 
> FlinkKafkaConsumer could continue reading from this topic, when a single 
> broker fails, by "simply" switching to the new leader `s for all lost 
> partitions after Kafka failover. Currently, the KafkaConsumer will most 
> likely throw in exception as topic metadata is only periodically fetched from 
> the Kafka cluster.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (FLINK-13210) Hive connector test should dependent on blink planner instead of legacy planner

2019-07-11 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-13210:


Assignee: Rui Li  (was: Bowen Li)

> Hive connector test should dependent on blink planner instead of legacy 
> planner
> ---
>
> Key: FLINK-13210
> URL: https://issues.apache.org/jira/browse/FLINK-13210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: Jingsong Lee
>Assignee: Rui Li
>Priority: Major
>
> Blink planner has more support and more functions, and some ITCase will not 
> be able to measure it without relying on Blink-planner in test.
> And now, the table env is unified, I think we can use unified table env to it 
> cases.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] godfreyhe commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r302807643
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, PartitionPruner, 
RexNodeExtractor}
+import org.apache.flink.table.sources.PartitionableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that tries to push partitions evaluated by filter condition 
into a
+  * [[PartitionableTableSource]].
+  */
+class PushPartitionIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[Filter],
+operand(classOf[LogicalTableScan], none)),
+  "PushPartitionIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val filter: Filter = call.rel(0)
+if (filter.getCondition == null) {
+  return false
+}
+
+val scan: LogicalTableScan = call.rel(1)
+scan.getTable.unwrap(classOf[TableSourceTable[_]]) match {
+  case table: TableSourceTable[_] =>
+table.tableSource match {
+  case p: PartitionableTableSource => p.getPartitionFieldNames.nonEmpty
+  case _ => false
+}
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val filter: Filter = call.rel(0)
+val scan: LogicalTableScan = call.rel(1)
+val table: FlinkRelOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+pushPartitionIntoScan(call, filter, scan, table)
+  }
+
+  private def pushPartitionIntoScan(
+  call: RelOptRuleCall,
+  filter: Filter,
+  scan: LogicalTableScan,
+  relOptTable: FlinkRelOptTable): Unit = {
+
+val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
+val tableSource = 
tableSourceTable.tableSource.asInstanceOf[PartitionableTableSource]
+val partitionFieldNames = tableSource.getPartitionFieldNames.toList.toArray
 
 Review comment:
   should convert to `Scala` list first and then convert to `Scala` array. 
Because the `Java` list also has `toArray` method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r302806940
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.calcite.{FlinkContext, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, PartitionPruner, 
RexNodeExtractor}
+import org.apache.flink.table.sources.PartitionableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that tries to push partitions evaluated by filter condition 
into a
+  * [[PartitionableTableSource]].
+  */
+class PushPartitionIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[Filter],
+operand(classOf[LogicalTableScan], none)),
+  "PushPartitionIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val filter: Filter = call.rel(0)
+if (filter.getCondition == null) {
+  return false
+}
+
+val scan: LogicalTableScan = call.rel(1)
+scan.getTable.unwrap(classOf[TableSourceTable[_]]) match {
+  case table: TableSourceTable[_] =>
+table.tableSource match {
+  case p: PartitionableTableSource => p.getPartitionFieldNames.nonEmpty
+  case _ => false
+}
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val filter: Filter = call.rel(0)
+val scan: LogicalTableScan = call.rel(1)
+val table: FlinkRelOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+pushPartitionIntoScan(call, filter, scan, table)
+  }
+
+  private def pushPartitionIntoScan(
+  call: RelOptRuleCall,
+  filter: Filter,
+  scan: LogicalTableScan,
+  relOptTable: FlinkRelOptTable): Unit = {
+
+val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
+val tableSource = 
tableSourceTable.tableSource.asInstanceOf[PartitionableTableSource]
+val partitionFieldNames = tableSource.getPartitionFieldNames.toList.toArray
+val inputFieldType = filter.getInput.getRowType
+
+val relBuilder = call.builder()
+val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan)
+val (partitionPredicate, nonPartitionPredicate) =
+  RexNodeExtractor.extractPartitionPredicates(
+filter.getCondition,
+maxCnfNodeCount,
+inputFieldType.getFieldNames.toList.toArray,
+relBuilder.getRexBuilder,
+partitionFieldNames
+  )
+
+if (partitionPredicate.isAlwaysTrue) {
+  // no partition predicates in filter
+  return
+}
+
+val finalPartitionPredicate = adjustPartitionPredicate(
+  inputFieldType.getFieldNames.toList.toArray,
+  partitionFieldNames,
+  partitionPredicate
+)
+val partitionFieldTypes = partitionFieldNames.map { name =>
+  val index = inputFieldType.getFieldNames.indexOf(name)
+  require(index >= 0, s"$name is not found in 
${inputFieldType.getFieldNames.mkString(", ")}")
+  inputFieldType.getFieldList.get(index).getType
+}.map(FlinkTypeFactory.toLogicalType)
+
+val allPartitions = tableSource.getPartitions
+val remainingPartitions = PartitionPruner.prunePartitions(
+  call.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig,
+  partitionFieldNames,
+  partitionFieldTypes,
+  allPartitions,
+  finalPartitionPredicate
+)
+
+val newTableSource = 

[GitHub] [flink] godfreyhe commented on a change in pull request #9080: [FLINK-13115] [table-planner-blink] Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-11 Thread GitBox
godfreyhe commented on a change in pull request #9080: [FLINK-13115] 
[table-planner-blink] Introduce planner rule to support partition pruning for 
PartitionableTableSource
URL: https://github.com/apache/flink/pull/9080#discussion_r302806331
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
 ##
 @@ -243,6 +245,7 @@ object FlinkBatchRuleSets {
 // scan optimization
 PushProjectIntoTableSourceScanRule.INSTANCE,
 PushFilterIntoTableSourceScanRule.INSTANCE,
+PushPartitionIntoTableSourceScanRule.INSTANCE,
 
 Review comment:
   in hep program, we could make sure the partition predicate could be push 
down totally, because its rules is part of `logical` volcano rules. this 
require the `PartitionableTableSource#getPartitions` returns remaining 
partition(first time, returns all partition). to make things easier, I will 
remove `PushPartitionIntoTableSourceScanRule` from volcano rule sets.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9071: FLINK-13044 [BuildSystem / Shaded] Fix for wrong shading of AWS SDK in flink-s3-fs-hadoop

2019-07-11 Thread GitBox
flinkbot edited a comment on issue #9071: FLINK-13044 [BuildSystem / Shaded] 
Fix for wrong shading of AWS SDK in flink-s3-fs-hadoop
URL: https://github.com/apache/flink/pull/9071#issuecomment-510513895
 
 
   ## CI report:
   
   * 9b3db8abd8b43f98158021a430b00a20fae2b26d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118791330)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   >