[GitHub] [flink] pnowojski merged pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
pnowojski merged pull request #8925: [FLINK-12852][network] Fix the deadlock 
occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9075:  [FLINK-10245][hbase] Add 
an upsert table sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#discussion_r302388287
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An upsert {@link UpsertStreamTableSink} for HBase.
+ */
+public class HBaseUpsertTableSink implements UpsertStreamTableSink {
+
+   private final HBaseTableSchema hbaseTableSchema;
+   private final TableSchema tableSchema;
+   private final HBaseOptions hbaseOptions;
+   private final HBaseWriteOptions writeOptions;
+
+   public HBaseUpsertTableSink(
+   HBaseTableSchema hbaseTableSchema,
+   HBaseOptions hbaseOptions,
+   HBaseWriteOptions writeOptions) {
+   checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), 
"HBaseUpsertTableSink requires rowkey is set.");
+   this.hbaseTableSchema = hbaseTableSchema;
+   this.tableSchema = hbaseTableSchema.convertsToTableSchema();
+   this.hbaseOptions = hbaseOptions;
+   this.writeOptions = writeOptions;
+   }
+
+   @Override
+   public void setKeyFields(String[] keys) {
+   // hbase always upsert on rowkey, ignore query keys.
 
 Review comment:
   I'm not sure. Because the support for key derivation is not very well, esp. 
fields concating which is used heavily in hbase sink.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
pnowojski commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302387828
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -177,7 +193,8 @@ public void recycle(MemorySegment segment) {
 
final List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
try {
-   while (segments.size() < numberOfSegmentsToRequest) {
+   final Deadline deadline = 
Deadline.now().plus(Duration.ofMillis(requestSegmentsTimeoutInMillis));
 
 Review comment:
   Let's stick with the `Deadline` as in your proposal there is already a small 
bug (for the timeout based on subtraction to work, you would have to subtract 
it only if `segment == null`, otherwise even if get the buffer after only a 1 
millisecond of waiting, you subtract 2 seconds of waiting time).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9073: [FLINK-13187] Introduce ScheduleMode#LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST

2019-07-10 Thread GitBox
flinkbot commented on issue #9073: [FLINK-13187] Introduce 
ScheduleMode#LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST
URL: https://github.com/apache/flink/pull/9073#issuecomment-510354715
 
 
   CI report for commit f588483906ba4c4459bf18671f84924a3286: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118672483)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
JingsongLi commented on issue #9075:  [FLINK-10245][hbase] Add an upsert table 
sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#issuecomment-510354243
 
 
   LGTM +1, just left minor comments...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
JingsongLi commented on a change in pull request #9075:  [FLINK-10245][hbase] 
Add an upsert table sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#discussion_r302385261
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An upsert {@link UpsertStreamTableSink} for HBase.
+ */
+public class HBaseUpsertTableSink implements UpsertStreamTableSink {
+
+   private final HBaseTableSchema hbaseTableSchema;
+   private final TableSchema tableSchema;
+   private final HBaseOptions hbaseOptions;
+   private final HBaseWriteOptions writeOptions;
+
+   public HBaseUpsertTableSink(
+   HBaseTableSchema hbaseTableSchema,
+   HBaseOptions hbaseOptions,
+   HBaseWriteOptions writeOptions) {
+   checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), 
"HBaseUpsertTableSink requires rowkey is set.");
+   this.hbaseTableSchema = hbaseTableSchema;
+   this.tableSchema = hbaseTableSchema.convertsToTableSchema();
+   this.hbaseOptions = hbaseOptions;
+   this.writeOptions = writeOptions;
+   }
+
+   @Override
+   public void setKeyFields(String[] keys) {
+   // hbase always upsert on rowkey, ignore query keys.
 
 Review comment:
   Check to row keys?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12749) Getting Started - Docker Playgrounds - Flink Cluster Playground

2019-07-10 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882682#comment-16882682
 ] 

Robert Metzger commented on FLINK-12749:


I'm happy to create an Apache repo for this, but we should probably first 
discuss this on the dev@ list.

> Getting Started - Docker Playgrounds - Flink Cluster Playground
> ---
>
> Key: FLINK-12749
> URL: https://issues.apache.org/jira/browse/FLINK-12749
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> The planned structure for the new Getting Started Guide is
> * Flink Overview (~ two pages)
> * Project Setup
> ** Java
> ** Scala
> ** Python
> * Quickstarts
> ** Example Walkthrough - Table API / SQL
> ** Example Walkthrough - DataStream API
> * Docker Playgrounds
> ** Flink Cluster Playground
> ** Flink Interactive SQL Playground
> In this ticket we add the Flink Cluster Playground, a docker-compose based 
> setup consisting of Apache Kafka and Apache Flink (Flink Session Cluster), 
> including a step-by-step guide for some common commands (job submission, 
> savepoints, etc).
> *Some Open Questions:*
> * Which Flink images to use? `library/flink` with dynamic properties would be 
> the most maintainable, I think. It would be preferable, if we don't need to 
> host any custom images for this, but can rely on the existing plain Flink 
> images.
> * Which Flink jobs to use? An updated version 
> {{org.apache.flink.streaming.examples.statemachine.StateMachineExample}} 
> might be a good option as it can with or without Kafka and contains a data 
> generator writing to Kafka already (see next questions).
> * How to get data into Kafka? Maybe just provide a small bash 
> script/one-liner to produce into Kafka topic or see question above.
> * Which Kafka Images to use? https://hub.docker.com/r/wurstmeister/kafka/ 
> seems to be well-maintained and is openly available.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] JingsongLi commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
JingsongLi commented on a change in pull request #9075:  [FLINK-10245][hbase] 
Add an upsert table sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#discussion_r302370020
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Common Options for HBase.
+ */
+public class HBaseOptions {
+
+   private final String tableName;
+   private final String zkQuorum;
+   @Nullable private final String zkNodeParent;
+
+   private HBaseOptions(String tableName, String zkQuorum, String 
zkNodeParent) {
+   this.tableName = tableName;
+   this.zkQuorum = zkQuorum;
+   this.zkNodeParent = zkNodeParent;
+   }
+
+   String getTableName() {
 
 Review comment:
   let these methods public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13202) Unstable StandaloneResourceManagerTest

2019-07-10 Thread Kurt Young (JIRA)
Kurt Young created FLINK-13202:
--

 Summary: Unstable StandaloneResourceManagerTest
 Key: FLINK-13202
 URL: https://issues.apache.org/jira/browse/FLINK-13202
 Project: Flink
  Issue Type: Test
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: Kurt Young


[https://api.travis-ci.org/v3/job/557150195/log.txt]

 

06:37:02.888 [ERROR] Failures:

06:37:02.889 [ERROR] 
StandaloneResourceManagerTest.testStartupPeriod:60->assertHappensUntil:114 
condition was not fulfilled before the deadline



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[GitHub] [flink] bowenli86 edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-10 Thread GitBox
bowenli86 edited a comment on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-510350722
 
 
   Hi @JingsongLi , thanks for the explanation!
   
   I think solution II works better for now compared to solution I. Solution I 
is too limited and may not satisfy our users' need.
   
   I've updated this PR according to your feedback, please take a look. Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-10 Thread GitBox
bowenli86 commented on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-510350722
 
 
   Hi @JingsongLi , thanks for the explanation!
   
   I think solution II works better for now compared to solution I. I've 
updated this PR according to your feedback, please take a look. Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] chancelq commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
chancelq commented on issue #9075:  [FLINK-10245][hbase] Add an upsert table 
sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#issuecomment-510350794
 
 
   LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9065: [FLINK-13102][travis] Optimize some travis stages by skipping mvn verify if possible

2019-07-10 Thread GitBox
flinkbot commented on issue #9065: [FLINK-13102][travis] Optimize some travis 
stages by skipping mvn verify if possible
URL: https://github.com/apache/flink/pull/9065#issuecomment-510347308
 
 
   CI report for commit 2e0e53b8f336838bdcca6236913790c144251941: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118642182)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-07-10 Thread GitBox
flinkbot commented on issue #8294: [FLINK-12348][table-planner-blink]Use 
TableConfig in api module to replace TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#issuecomment-510345595
 
 
   CI report for commit c51af3f0c5298e6a769c9ba84dfac18d2c2074aa: SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634908)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor

2019-07-10 Thread GitBox
XuPingyong commented on a change in pull request #9057: [FLINK-13121] 
[table-planner-blink] Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#discussion_r302377110
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
 ##
 @@ -42,47 +42,105 @@
 @Internal
 public class BatchExecutor extends ExecutorBase {
 
+   private boolean enableObjectReuse;
+   private long latencyTrackingInterval;
+   private long bufferTimeout;
+   private TimeCharacteristic timeCharacteristic;
+   private InputDependencyConstraint inputDependencyConstraint;
+
@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
}
 
@Override
public JobExecutionResult execute(String jobName) throws Exception {
-   if (transformations.isEmpty()) {
-   throw new TableException("No table sinks have been 
created yet. " +
-   "A program needs at least one sink that 
consumes data. ");
-   }
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-   StreamGraph streamGraph = generateStreamGraph(execEnv, 
transformations, getNonEmptyJobName(jobName));
-
-   // TODO supports streamEnv.execute(streamGraph)
-   try {
-   return execEnv.execute(getNonEmptyJobName(jobName));
-   } finally {
-   transformations.clear();
-   }
+   StreamGraph streamGraph = generateStreamGraph(transformations, 
jobName);
+   return execEnv.execute(streamGraph);
}
 
-   public static StreamGraph generateStreamGraph(
-   StreamExecutionEnvironment execEnv,
-   List> transformations,
-   String jobName) throws Exception {
-   // TODO avoid cloning ExecutionConfig
-   ExecutionConfig executionConfig = 
InstantiationUtil.clone(execEnv.getConfig());
+   /**
+* Backup previous streamEnv config and set batch configs.
+*/
+   private void backupAndUpdateStreamEnv(StreamExecutionEnvironment 
execEnv) {
 
 Review comment:
   As batch job execution can change the properties of streamEnv to affect its 
reuse.  Added UT.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor

2019-07-10 Thread GitBox
XuPingyong commented on a change in pull request #9057: [FLINK-13121] 
[table-planner-blink] Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#discussion_r302376863
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
 ##
 @@ -116,25 +75,40 @@ class BatchExecExchange(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
 super.explainTerms(pw)
-  .itemIf("exchange_mode", requiredExchangeMode.orNull,
-requiredExchangeMode.contains(DataExchangeMode.BATCH))
+  .itemIf("shuffle_mode", requiredShuffleMode.orNull,
+requiredShuffleMode.contains(ShuffleMode.BATCH))
+.itemIf("full_dam_behavior", getDamBehavior,
 
 Review comment:
   had removed it...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor

2019-07-10 Thread GitBox
XuPingyong commented on a change in pull request #9057: [FLINK-13121] 
[table-planner-blink] Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#discussion_r302376778
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java
 ##
 @@ -195,4 +195,11 @@
"means a kind of 
disabled operator. Its default value is empty that means no operators are 
disabled. " +
"If the configure's 
value is \"NestedLoopJoin, ShuffleHashJoin\", NestedLoopJoin and 
ShuffleHashJoin " +
"are disabled. If 
configure's value is \"HashJoin\", ShuffleHashJoin and BroadcastHashJoin are 
disabled.");
+
+   public static final ConfigOption 
SQL_EXEC_SHUFFLE_MODE_ALL_BATCH =
+   key("sql.exec.shuffle-mode.all-batch")
 
 Review comment:
   ok, changed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9036: [FLINK-13112][table-planner-blink] Support LocalZonedTimestampType in blink

2019-07-10 Thread GitBox
KurtYoung commented on a change in pull request #9036: 
[FLINK-13112][table-planner-blink] Support LocalZonedTimestampType in blink
URL: https://github.com/apache/flink/pull/9036#discussion_r302376027
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
 ##
 @@ -33,9 +33,9 @@
 public class TableConfig {
 
/**
-* Defines the timezone for date/time/timestamp conversions.
+* Defines the zone id for timestamp with local time zone.
 */
-   private TimeZone timeZone = TimeZone.getTimeZone("UTC");
+   private ZoneId localZoneId = ZoneId.systemDefault();
 
 Review comment:
   cc @twalthr, we changed `TimeZone` to `LocalZoneId` to better fit with 
`TIMESTAMP WITH LOCAL TIME ZONE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
gaoyunhaii commented on issue #8925: [FLINK-12852][network] Fix the deadlock 
occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#issuecomment-510341121
 
 
   Very thanks for the review @pnowojski @StephanEwen ! I have updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible

2019-07-10 Thread GitBox
flinkbot commented on issue #8877: [FLINK-12984][metrics] only call 
Histogram#getStatistics() once where possible
URL: https://github.com/apache/flink/pull/8877#issuecomment-510337436
 
 
   CI report for commit f183d69a6ba8fbd3b9bb3c5474b392950811dfa8: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634881)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-07-10 Thread GitBox
KurtYoung closed pull request #8294: [FLINK-12348][table-planner-blink]Use 
TableConfig in api module to replace TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302370153
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properties.

[GitHub] [flink] KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-07-10 Thread GitBox
KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use 
TableConfig in api module to replace TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#issuecomment-510336618
 
 
   travis passed here: https://travis-ci.org/beyond1920/flink/builds/557128403


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-07-10 Thread GitBox
KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use 
TableConfig in api module to replace TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#issuecomment-510336673
 
 
   merging this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock 
occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#issuecomment-510335932
 
 
   CI report for commit 31a51cbe260a78381dc44973e6724c20532b5deb: SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634862)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302369427
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properti

[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302368883
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properti

[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302367751
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -74,12 +118,56 @@ public static Builder builder() {
return new Builder();
}
 
+   private JDBCInputFormat getInputFormat() {
+   JDBCInputFormat.JDBCInputFormatBuilder builder = 
JDBCInputFormat.buildJDBCInputFormat()
+   .setDrivername(options.getDriverName())
+   .setDBUrl(options.getDbURL())
+   .setUsername(options.getUsername())
+   .setPassword(options.getPassword())
+   .setRowTypeInfo(new 
RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));
+
+   String query = options.getDialect().getSelectFromStatement(
+   options.getTableName(), returnType.getFieldNames(), new 
String[0]);
+   if (scanOptions != null) {
+   long lowerBound = scanOptions.getPartitionLowerBound();
+   long upperBound = scanOptions.getPartitionUpperBound();
+   long numPartitions = scanOptions.getNumPartitions();
+   // partitionSize = Math.ceil(upperBound - lowerBound + 
1) / numPartitions;
+   // the following is equivalent
+   long partitionSize = (upperBound - lowerBound + 
numPartitions) / numPartitions;
+   builder = builder.setParametersProvider(new 
NumericBetweenParametersProvider(
 
 Review comment:
   Maybe you can use a new `ParameterValuesProvider` to deal with 
`numPartitions`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302364645
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -55,6 +89,16 @@ public JDBCTableSource(
.build();
 
 Review comment:
   @TsReaper Can you add some tests to test `ProjectableTableSource`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-10 Thread GitBox
JingsongLi commented on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-510329508
 
 
   Hi bowen:
   > `TableFunctionDefinition` and `AggregateFunctionDefinition` need 
resultType and accumulateType
   
   First, I think the design of `FunctionDefinition` should be optimized, this 
result type should be an implicit result type instead of real result type, 
because we can not able to infer the type here, so we can't get the real result 
type.
   
   In our internal blink we pass `GenericType` to 
`TableFunctionDefinition` resultType, and pass `Generic` to aggregate 
resultType and pass `GenericType` to aggregate 
accumulateType. (See `HiveGenericUDTF` and `HiveGenericUDAF`) I think it's okay 
to be an implicit result type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8990: [FLINK-13104][metrics] Updated request callback to log warning on failure

2019-07-10 Thread GitBox
flinkbot commented on issue #8990: [FLINK-13104][metrics] Updated request 
callback to log warning on failure
URL: https://github.com/apache/flink/pull/8990#issuecomment-510328396
 
 
   CI report for commit f3b49c2ad8fd13e880ad5908b69c33bb841d52a5: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118701357)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8962: [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner

2019-07-10 Thread GitBox
flinkbot commented on issue #8962: [FLINK-13076] [table-planner-blink] Bump 
Calcite dependency to 1.20.0 in blink planner
URL: https://github.com/apache/flink/pull/8962#issuecomment-510327044
 
 
   CI report for commit d1f5135c005d7a116c819543feb5ce07c42d8cfc: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634816)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-10 Thread GitBox
flinkbot commented on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-510327018
 
 
   CI report for commit 4afedee15460ac0f1f2945ca657581c538ddfc06: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118682784)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9077: [FLINK-13196][table] Fix Ambiguous column name exception bug for Table API

2019-07-10 Thread GitBox
flinkbot commented on issue #9077: [FLINK-13196][table] Fix Ambiguous column 
name exception bug for Table API
URL: https://github.com/apache/flink/pull/9077#issuecomment-510326720
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13196) Fix Ambiguous column name exception bug for Table API

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13196:
---
Labels: pull-request-available  (was: )

> Fix Ambiguous column name exception bug for Table API
> -
>
> Key: FLINK-13196
> URL: https://issues.apache.org/jira/browse/FLINK-13196
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> The following query should be valid, however, ambiguous column name exception 
> is thrown.
> {code:java}
> val util = streamTestUtil()
> val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
> val resultTable = table
>   .groupBy('b)
>   .select('b, 'a.sum, 'a.sum, 'a.sum)
> {code}
> {code:java}
> org.apache.flink.table.api.ValidationException: Ambiguous column name: EXPR$0
>   at 
> org.apache.flink.table.operations.utils.factories.ProjectionOperationFactory.lambda$validateAndGetUniqueNames$4(ProjectionOperationFactory.java:103)
> {code}
> We should add some alias logic in {{AggregationAndPropertiesReplacer}} if the 
> name has ever been used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 opened a new pull request #9077: [FLINK-13196][table] Fix Ambiguous column name exception bug for Table API

2019-07-10 Thread GitBox
hequn8128 opened a new pull request #9077: [FLINK-13196][table] Fix Ambiguous 
column name exception bug for Table API
URL: https://github.com/apache/flink/pull/9077
 
 
   
   
   ## What is the purpose of the change
   
   This pull request fixes Ambiguous column name exception bug for Table API. 
   
   The following query should be valid, however, ambiguous column name 
exception is thrown.
   ```
   val util = streamTestUtil()
   val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
   
   val resultTable = table
 .groupBy('b)
 .select('b, 'a.sum, 'a.sum, 'a.sum)
   ```
   This pull request adds some alias logic in 
`AggregationAndPropertiesReplacer` if the name has ever been used.
   
   ## Brief change log
   
 - Add alias logic in `AggregationAndPropertiesReplacer` if the name has 
ever been used. Note we don't need to add the alias logic for aggregates and 
properties in a udf, there is no ambiguous column problem for this case.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as the plan tests.
   
   Also, this change added tests and can be verified as follows:
 - Added `testAggregateReuse()` in `AggregateTest` to test the alias logic.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302354841
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -74,12 +118,56 @@ public static Builder builder() {
return new Builder();
}
 
+   private JDBCInputFormat getInputFormat() {
+   JDBCInputFormat.JDBCInputFormatBuilder builder = 
JDBCInputFormat.buildJDBCInputFormat()
+   .setDrivername(options.getDriverName())
+   .setDBUrl(options.getDbURL())
+   .setUsername(options.getUsername())
+   .setPassword(options.getPassword())
+   .setRowTypeInfo(new 
RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));
 
 Review comment:
   The field types and field names should be projected.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302354736
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -55,6 +89,16 @@ public JDBCTableSource(
.build();
 
 Review comment:
   The field types and field names passed into `JDBCLookupFunction` should be 
projected. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302359849
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 ##
 @@ -95,7 +99,7 @@ public static String getCreateQuery(String tableName) {
sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+   sqlQueryBuilder.append("price DOUBLE DEFAULT NULL,");
 
 Review comment:
   Why need to change to DOUBLE?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302355917
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -18,30 +18,64 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * {@link TableSource} for JDBC.
- * Now only support {@link LookupableTableSource}.
  */
-public class JDBCTableSource implements LookupableTableSource {
+public class JDBCTableSource implements
+   StreamTableSource,
+   ProjectableTableSource,
+   LookupableTableSource {
 
private final JDBCOptions options;
+   private final JDBCScanOptions scanOptions;
 
 Review comment:
   Mark `@Nullable` annotation on the members which maybe null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302358796
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properties.

[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302358581
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
 
 Review comment:
   `connector.lookup.cache.max-rows`?  rows may be more easy to understand than 
entries? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302358649
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
 
 Review comment:
   `connector.write.flush.max-rows` ?  Align with cache.max-rows


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302355599
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -74,12 +118,56 @@ public static Builder builder() {
return new Builder();
}
 
+   private JDBCInputFormat getInputFormat() {
+   JDBCInputFormat.JDBCInputFormatBuilder builder = 
JDBCInputFormat.buildJDBCInputFormat()
+   .setDrivername(options.getDriverName())
+   .setDBUrl(options.getDbURL())
+   .setUsername(options.getUsername())
+   .setPassword(options.getPassword())
+   .setRowTypeInfo(new 
RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));
+
+   String query = options.getDialect().getSelectFromStatement(
+   options.getTableName(), returnType.getFieldNames(), new 
String[0]);
+   if (scanOptions != null) {
+   long lowerBound = scanOptions.getPartitionLowerBound();
+   long upperBound = scanOptions.getPartitionUpperBound();
+   long numPartitions = scanOptions.getNumPartitions();
+   // partitionSize = Math.ceil(upperBound - lowerBound + 
1) / numPartitions;
+   // the following is equivalent
 
 Review comment:
   If they are equal, why not use `Math.ceil` ? And it seems that they are not 
equal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302357869
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
 
 Review comment:
   I prefer to use `connector.username` which is more clear. I think we don't 
need to align with the parameter  name in jdbc url. 
[HIVE](https://cwiki.apache.org/confluence/display/Hive/JdbcStorageHandler) 
also uses `username`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302356136
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
 
 Review comment:
   Move all of them to `JDBCValidator` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302357572
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
 
 Review comment:
   I prefer use `connector.table-name` or `connector.table`, because dbtable 
looks like we also need db name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302359068
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properties.

[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302359017
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properties.

[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302355652
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
 ##
 @@ -74,12 +118,56 @@ public static Builder builder() {
return new Builder();
}
 
+   private JDBCInputFormat getInputFormat() {
+   JDBCInputFormat.JDBCInputFormatBuilder builder = 
JDBCInputFormat.buildJDBCInputFormat()
+   .setDrivername(options.getDriverName())
+   .setDBUrl(options.getDbURL())
+   .setUsername(options.getUsername())
+   .setPassword(options.getPassword())
+   .setRowTypeInfo(new 
RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));
+
+   String query = options.getDialect().getSelectFromStatement(
+   options.getTableName(), returnType.getFieldNames(), new 
String[0]);
+   if (scanOptions != null) {
+   long lowerBound = scanOptions.getPartitionLowerBound();
+   long upperBound = scanOptions.getPartitionUpperBound();
+   long numPartitions = scanOptions.getNumPartitions();
+   // partitionSize = Math.ceil(upperBound - lowerBound + 
1) / numPartitions;
+   // the following is equivalent
+   long partitionSize = (upperBound - lowerBound + 
numPartitions) / numPartitions;
+   builder = builder.setParametersProvider(new 
NumericBetweenParametersProvider(
+   partitionSize, lowerBound, upperBound));
+   query += " WHERE " + 
scanOptions.getPartitionColumnName() + " >= ? " + " AND " +
 
 Review comment:
   Quotes missed around the column names.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302358314
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
 
 Review comment:
   I would like to rename "scan" to "read" to align with "write". How about to 
use:
   
   ```
   connector.read.partition.column
   connector.read.partition.lower-bound
   connector.read.partition.upper-bound
   connector.read.partition.num
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302355984
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
 
 Review comment:
   Unused import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302360021
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryITCase.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created
+ * by {@link JDBCTableSourceSinkFactory}.
+ */
+public class JDBCTableSourceSinkFactoryITCase extends JDBCTestBase {
+
+   @Test
+   public void testJDBCScanSource() throws Exception {
+   testJDBCScanSourceImpl(false);
+   }
+
+   @Test
+   public void testJDBCScanSourceWithParallelism() throws Exception {
+   testJDBCScanSourceImpl(true);
+   }
+
+   private void testJDBCScanSourceImpl(boolean useParallelism) throws 
Exception {
+   Map properties = 
getBasicProperties(INPUT_TABLE);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env);
+
+   if (useParallelism) {
+   
properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_PARTITION_COLUMN, 
"id");
+   
properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_LOWER_BOUND, 
TEST_DATA[0].id.toString());
+   
properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_UPPER_BOUND, 
TEST_DATA[TEST_DATA.length - 1].id.toString());
+   
properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_NUM_PARTITIONS, "5");
+
+   env.setParallelism(TEST_DATA.length / 2 - 1);
+   }
+
+   final StreamTableSource source = 
TableFactoryService.find(StreamTableSourceFactory.class, properties)
+   .createStreamTableSource(properties);
+
+   DataStream resultSet = 
tEnv.toAppendStream(tEnv.fromTableSource(source), Row.class);
+   resultSet.addSink(new StreamITCase.StringSink<>());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   for (TestEntry entry : TEST_DATA) {
+   expected.add(entry.id + "," + entry.title + "," + 
entry.author + "," + entry.price + "," + entry.qty);
+   }
+   StreamITCase.compareWithList(expected);
+   StreamITCase.clear();
+   }
+
+   @Test
+   public void testJDBCScanSourceWithProjection() throws Exception {
+   Map properties = 
getBasicProperties(INPUT_TABLE);
+
+   final StreamTableSource source = 
TableFactoryService.find(StreamTableSourceFactory.class, properties)
+   .createStreamTableSource(properties);
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnviron

[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302360647
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryITCase.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created
+ * by {@link JDBCTableSourceSinkFactory}.
+ */
+public class JDBCTableSourceSinkFactoryITCase extends JDBCTestBase {
 
 Review comment:
   TableFactory are verified using UT. We want to avoid to introduce IT cases 
as much as possible. You can refer `KafkaTableSourceSinkFactoryTest` as an 
example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] 
Introduce JDBC table factory and bridge JDBC table source with streaming table 
source
URL: https://github.com/apache/flink/pull/9029#discussion_r302356473
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.TimeLength;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and 
{@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+   StreamTableSourceFactory,
+   StreamTableSinkFactory> {
+
+   public static final String CONNECTOR_URL = "connector.url";
+   public static final String CONNECTOR_DBTABLE = "connector.dbtable";
+   public static final String CONNECTOR_DRIVER = "connector.driver";
+   public static final String CONNECTOR_USER = "connector.user";
+   public static final String CONNECTOR_PASSWORD = "connector.password";
+
+   public static final String CONNECTOR_SCAN_PARTITION_COLUMN = 
"connector.scan.partition-column";
+   public static final String CONNECTOR_SCAN_LOWER_BOUND = 
"connector.scan.lower-bound";
+   public static final String CONNECTOR_SCAN_UPPER_BOUND = 
"connector.scan.upper-bound";
+   public static final String CONNECTOR_SCAN_NUM_PARTITIONS = 
"connector.scan.num-partitions";
+
+   public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = 
"connector.lookup.cache.max-entries";
+   public static final String CONNECTOR_LOOKUP_CACHE_TTL = 
"connector.lookup.cache.ttl";
+   public static final String CONNECTOR_LOOKUP_MAX_RETRIES = 
"connector.lookup.max-retries";
+
+   public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = 
"connector.write.flush.max-size";
+   public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = 
"connector.write.flush.interval";
+   public static final String CONNECTOR_WRITE_MAX_RETRIES = 
"connector.write.max-retries";
+
+   @Override
+   public Map requiredContext() {
+   Map context = new HashMap<>();
+   context.put(CONNECTOR_TYPE, "jdbc"); // jdbc
+   context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
+   return context;
+   }
+
+   @Override
+   public List supportedProperties() {
+   List properties = new ArrayList<>();
+
+   // common options
+   properties.add(CONNECTOR_DRIVER);
+   properties.add(CONNECTOR_URL);
+   properties.add(CONNECTOR_DBTABLE);
+   properties.add(CONNECTOR_USER);
+   properties.add(CONNECTOR_PASSWORD);
+
+   // scan partition options
+   properties.

[GitHub] [flink] flinkbot commented on issue #9021: [hostfix][runtime] Make checkpoints injection ordered with stop-with-savepoint

2019-07-10 Thread GitBox
flinkbot commented on issue #9021: [hostfix][runtime] Make checkpoints 
injection ordered with stop-with-savepoint
URL: https://github.com/apache/flink/pull/9021#issuecomment-510323564
 
 
   CI report for commit abed4b5678a2f09b3bb729bd62b5264e56b55b9f: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634788)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-10 Thread GitBox
xuefuz commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r302357374
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A factory to create {@link FunctionDefinition} based on string-based 
properties.
+ * See also {@link TableFactory} for more information.
+ */
+public interface FunctionDefinitionFactory extends TableFactory {
+
+   /**
+* Creates a {@link FunctionDefinition} from given {@link 
CatalogFunction}.
+*
+* @param name name of the {@link CatalogFunction}
+* @param catalogFunction the catalog function
+* @param constantArguments arguments of a function call (only literal 
arguments
+*  are passed, nulls for non-literal ones)
+* @param argTypes types of arguments
+* @return a {@link FunctionDefinition}
+*/
+   FunctionDefinition createFunctionDefinition(
+   String name, CatalogFunction catalogFunction, Object[] 
constantArguments, DataType[] argTypes);
 
 Review comment:
   Currently HiveSource/Sink need database name and table name to access Hive 
metastore directly to fetch additional information needed to read/write Hive 
table, which is why HiveTableFactory provides ObjectPath. Note that 
CatalogTable itself doesn't carry over this info.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9047: [FLINK-13109][docs-zh]Translate "Restart Strategies" page into Chinese

2019-07-10 Thread GitBox
flinkbot commented on issue #9047: [FLINK-13109][docs-zh]Translate "Restart 
Strategies" page into Chinese
URL: https://github.com/apache/flink/pull/9047#issuecomment-510321288
 
 
   CI report for commit 4eeb4fcf1480bd43e12be51879bc5b4ce901b5e9: SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634731)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302356552
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -186,6 +203,22 @@ public void recycle(MemorySegment segment) {
if (segment != null) {
segments.add(segment);
}
+
+   if (segments.size() >= 
numberOfSegmentsToRequest) {
+   break;
+   }
+
+   if (!deadline.hasTimeLeft()) {
+   throw new 
IOException(String.format("Insufficient number of network buffers: " +
 
 Review comment:
   Very thanks for the comments, I preferred to the second method since the 
timeout should need a different exception message. I have extracted the 
construction and changed the exception message for requesting timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302354310
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -186,6 +203,22 @@ public void recycle(MemorySegment segment) {
if (segment != null) {
segments.add(segment);
}
+
 
 Review comment:
   Agree with that the method seems too long and I have put this part into 
separated `tryRedistributeBuffers` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302353092
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -177,7 +193,8 @@ public void recycle(MemorySegment segment) {
 
final List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
try {
-   while (segments.size() < numberOfSegmentsToRequest) {
+   final Deadline deadline = 
Deadline.now().plus(Duration.ofMillis(requestSegmentsTimeoutInMillis));
 
 Review comment:
   Very thanks for the tips and changed to `fromNow`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302353455
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -73,17 +75,31 @@
 
private final int numberOfSegmentsToRequest;
 
+   private final long requestSegmentsTimeoutInMillis;
+
+   public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize, int numberOfSegmentsToRequest) {
+   this(numberOfSegmentsToAllocate, segmentSize, 
numberOfSegmentsToRequest, Integer.MAX_VALUE);
+   }
+
/**
 * Allocates all {@link MemorySegment} instances managed by this pool.
 */
-   public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize, int numberOfSegmentsToRequest) {
+   public NetworkBufferPool(
+   int numberOfSegmentsToAllocate,
+   int segmentSize,
+   int numberOfSegmentsToRequest,
+   long requestSegmentsTimeoutInMillis) {
 
 Review comment:
   Agree with that an encapsulated object should be more clear and changed the 
related variables to Duration in `NettyShuffleEnvironmentConfiguration`, 
`NetworkBufferPool` and `NettyShuffleEnvironmentBuilder`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner

2019-07-10 Thread GitBox
flinkbot commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] 
Bump Calcite dependency to 1.20.0 in sql parser & flink planner
URL: https://github.com/apache/flink/pull/9056#issuecomment-510317452
 
 
   CI report for commit e73503e4d0c3a07cc440bff8b0d62eefcb4834ec: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634699)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-07-10 Thread GitBox
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] 
Fix the deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#discussion_r302353092
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -177,7 +193,8 @@ public void recycle(MemorySegment segment) {
 
final List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
try {
-   while (segments.size() < numberOfSegmentsToRequest) {
+   final Deadline deadline = 
Deadline.now().plus(Duration.ofMillis(requestSegmentsTimeoutInMillis));
 
 Review comment:
   Changed to `fromNow`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13201) Unstable sql time udf test

2019-07-10 Thread Kurt Young (JIRA)
Kurt Young created FLINK-13201:
--

 Summary: Unstable sql time udf test
 Key: FLINK-13201
 URL: https://issues.apache.org/jira/browse/FLINK-13201
 Project: Flink
  Issue Type: Test
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: Kurt Young
Assignee: Jingsong Lee


org.apache.flink.table.runtime.batch.sql.CalcITCase:testTimeUDF will 
occasionally fail when running all scala tests through IDE. 

The output is:

{code:java}
java.lang.AssertionError: 
Results do not match for query:
  SELECT dateFunc(a), localDateFunc(a), dateFunc(b), localDateFunc(b), 
timeFunc(c), localTimeFunc(c), timeFunc(d), localTimeFunc(d), timestampFunc(e), 
datetimeFunc(e), timestampFunc(f), datetimeFunc(f) FROM MyTable

Results
 == Correct Result - 1 ==   

   == Actual Result - 1 ==
!1984-07-12,1984-07-12,1984-07-12,1984-07-12,08:03:09,08:03:09,08:03:09,08:03:09,2019-09-19
 08:03:09.0,2019-09-19T08:03:09,2019-09-19 08:03:09.0,2019-09-19T08:03:09   
1984-07-11,1984-07-12,1984-07-11,1984-07-12,00:03:09,08:03:09,08:03:09,16:03:09,2019-09-19
 00:03:09.0,2019-09-19T08:03:09,2019-09-19 08:03:09.0,2019-09-19T16:03:09

Plan:
  == Abstract Syntax Tree ==
LogicalProject(EXPR$0=[dateFunc($0)], EXPR$1=[localDateFunc($0)], 
EXPR$2=[dateFunc($1)], EXPR$3=[localDateFunc($1)], EXPR$4=[timeFunc($2)], 
EXPR$5=[localTimeFunc($2)], EXPR$6=[timeFunc($3)], EXPR$7=[localTimeFunc($3)], 
EXPR$8=[timestampFunc($4)], EXPR$9=[datetimeFunc($4)], 
EXPR$10=[timestampFunc($5)], EXPR$11=[datetimeFunc($5)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])

== Optimized Logical Plan ==
Calc(select=[dateFunc(a) AS EXPR$0, localDateFunc(a) AS EXPR$1, dateFunc(b) AS 
EXPR$2, localDateFunc(b) AS EXPR$3, timeFunc(c) AS EXPR$4, localTimeFunc(c) AS 
EXPR$5, timeFunc(d) AS EXPR$6, localTimeFunc(d) AS EXPR$7, timestampFunc(e) AS 
EXPR$8, datetimeFunc(e) AS EXPR$9, timestampFunc(f) AS EXPR$10, datetimeFunc(f) 
AS EXPR$11])
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e, f])

{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #9068: [FLINK-13195] Add create table support for SqlClient

2019-07-10 Thread GitBox
flinkbot commented on issue #9068: [FLINK-13195] Add create table support for 
SqlClient
URL: https://github.com/apache/flink/pull/9068#issuecomment-510314838
 
 
   CI report for commit 829e8aef5c2ef6c2263998f00cf73448ff4a518c: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634619)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink

2019-07-10 Thread GitBox
lirui-apache commented on issue #9067: [FLINK-13069][hive] HiveTableSink should 
implement OverwritableTableSink
URL: https://github.com/apache/flink/pull/9067#issuecomment-510313753
 
 
   Updated to address comments.
   @xuefuz @bowenli86 please take a look, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor

2019-07-10 Thread GitBox
flinkbot commented on issue #9057: [FLINK-13121] [table-planner-blink] Set 
batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#issuecomment-510313483
 
 
   CI report for commit b616282cb875778a7a5af22a2783eaaf48104908: FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634660)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink

2019-07-10 Thread GitBox
bowenli86 commented on a change in pull request #9067: [FLINK-13069][hive] 
HiveTableSink should implement OverwritableTableSink
URL: https://github.com/apache/flink/pull/9067#discussion_r302349765
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 ##
 @@ -235,10 +220,44 @@ public void testInsertIntoStaticPartition() throws 
Exception {
 
// make sure new partition is created
assertEquals(toWrite.size(), 
hiveCatalog.listPartitions(tablePath).size());
-   CatalogPartition catalogPartition = 
hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec));
 
-   String partitionLocation = 
catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
-   verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
+   verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
+
+   hiveCatalog.dropTable(tablePath, false);
+   }
+
+   @Test
+   public void testInsertOverwrite() throws Exception {
+   String dbName = "default";
+   String tblName = "dest";
+   RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
+   ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+   ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
+   BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(execEnv);
+
+   // write some data and verify
+   List toWrite = generateRecords(5);
+   tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, 
rowTypeInfo));
+
+   CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+   tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, table));
+   tableEnv.sql("select * from src").insertInto("destSink");
+   execEnv.execute();
+
+   verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
+
+   // write some data to overwrite existing data and verify
+   toWrite = generateRecords(3);
 
 Review comment:
   yeah, right, my bad...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #9039: [FLINK-13170][table-planner] Planner should get table factory from ca…

2019-07-10 Thread GitBox
godfreyhe commented on issue #9039: [FLINK-13170][table-planner] Planner should 
get table factory from ca…
URL: https://github.com/apache/flink/pull/9039#issuecomment-510311997
 
 
   thanks for this pr @lirui-apache. `StreamPlanner` in flink-table-planner 
module should also be updated, just as `PlannerBase` in 
flink-table-planner-blink module. and add some tests for also for blink planner?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink

2019-07-10 Thread GitBox
lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] 
HiveTableSink should implement OverwritableTableSink
URL: https://github.com/apache/flink/pull/9067#discussion_r302348621
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -198,6 +199,7 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
tablePath = (ObjectPath) in.readObject();
partitionToWriter = new HashMap<>();
tableProperties = (Properties) in.readObject();
+   hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
 
 Review comment:
   `open` is called on TM but not on JM, where `finalizeGlobal` is performed. 
That's why I initialize hiveVersion in `readObject` and constructor (to be 
safe).
   I'll serialize this field instead of getting it from jobConf.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lincoln-lil commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
lincoln-lil commented on a change in pull request #9075:  [FLINK-10245][hbase] 
Add an upsert table sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#discussion_r302347191
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java
 ##
 @@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The upsert sink for HBase.
+ *
+ * This class leverage {@link BufferedMutator} to buffer multiple
+ * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending 
the requests to cluster.
+ * The buffering strategy can be configured by {@code 
bufferFlushMaxSizeInBytes},
+ * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.
+ */
+public class HBaseUpsertSinkFunction
+   extends RichSinkFunction>
+   implements CheckpointedFunction, 
BufferedMutator.ExceptionListener {
+
+   private static final long serialVersionUID = 1L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseUpsertSinkFunction.class);
+
+   private final String hTableName;
+   private final HBaseTableSchema schema;
+   private final byte[] serializedConfig;
+
+   private final long bufferFlushMaxSizeInBytes;
+   private final long bufferFlushMaxMutations;
+   private final long bufferFlushIntervalMillis;
+
+   private transient HBaseReadWriteHelper helper;
+
+   private transient Connection connection;
+   private transient BufferedMutator mutator;
+
+   private transient ScheduledExecutorService executor;
+   private transient ScheduledFuture scheduledFuture;
+   private transient AtomicLong numPendingRequests;
+
+   private transient volatile boolean closed = false;
+
+   /**
+* This is set from inside the {@link 
BufferedMutator.ExceptionListener} if a {@link Throwable}
+* was thrown.
+*
+* Errors will be checked and rethrown before processing each input 
element, and when the sink is closed.
+*/
+   private final AtomicReference failureThrowable = new 
AtomicReference<>();
+
+   public HBaseUpsertSinkFunction(
+   String hTableName,
+   HBaseTableSchema schema,
+   org.apache.hadoop.conf.Configuration conf,
+   long bufferFlushMaxSizeInBytes,
+   long bufferFlushMaxMutations,
+ 

[GitHub] [flink] lincoln-lil commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
lincoln-lil commented on a change in pull request #9075:  [FLINK-10245][hbase] 
Add an upsert table sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#discussion_r302347476
 
 

 ##
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java
 ##
 @@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The upsert sink for HBase.
+ *
+ * This class leverage {@link BufferedMutator} to buffer multiple
+ * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending 
the requests to cluster.
+ * The buffering strategy can be configured by {@code 
bufferFlushMaxSizeInBytes},
+ * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.
+ */
+public class HBaseUpsertSinkFunction
+   extends RichSinkFunction>
+   implements CheckpointedFunction, 
BufferedMutator.ExceptionListener {
+
+   private static final long serialVersionUID = 1L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseUpsertSinkFunction.class);
+
+   private final String hTableName;
+   private final HBaseTableSchema schema;
+   private final byte[] serializedConfig;
+
+   private final long bufferFlushMaxSizeInBytes;
+   private final long bufferFlushMaxMutations;
+   private final long bufferFlushIntervalMillis;
+
+   private transient HBaseReadWriteHelper helper;
+
+   private transient Connection connection;
+   private transient BufferedMutator mutator;
+
+   private transient ScheduledExecutorService executor;
+   private transient ScheduledFuture scheduledFuture;
+   private transient AtomicLong numPendingRequests;
+
+   private transient volatile boolean closed = false;
+
+   /**
+* This is set from inside the {@link 
BufferedMutator.ExceptionListener} if a {@link Throwable}
+* was thrown.
+*
+* Errors will be checked and rethrown before processing each input 
element, and when the sink is closed.
+*/
+   private final AtomicReference failureThrowable = new 
AtomicReference<>();
+
+   public HBaseUpsertSinkFunction(
+   String hTableName,
+   HBaseTableSchema schema,
+   org.apache.hadoop.conf.Configuration conf,
+   long bufferFlushMaxSizeInBytes,
+   long bufferFlushMaxMutations,
+ 

[GitHub] [flink] zjuwangg commented on issue #9037: [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest

2019-07-10 Thread GitBox
zjuwangg commented on issue #9037: [FLINK-13157]reeanble unit test read 
complext type of HiveInputFormatTest
URL: https://github.com/apache/flink/pull/9037#issuecomment-510309575
 
 
   > > The latest changes look good to me. It would be good if we can revert 
unnecessary changes to the indention. (This might be taken care at commit time. 
(
   > 
   > Yes, as @xuefuz mentioned, please revert any unnecessary changes in the 
future. I reverted them for you this time.
   > 
   > Ran tests for Hive 2.3.4 and 1.2.1 locally. Merging
   
   Thanks, I will take care of in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-07-10 Thread GitBox
wuchong commented on issue #8294: [FLINK-12348][table-planner-blink]Use 
TableConfig in api module to replace TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#issuecomment-510307375
 
 
   How about changing `addParameters` to `addConfigs` ?  Maybe config is more 
appropriate than parameter here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on issue #9068: [FLINK-13195] Add create table support for SqlClient

2019-07-10 Thread GitBox
danny0405 commented on issue #9068: [FLINK-13195] Add create table support for 
SqlClient
URL: https://github.com/apache/flink/pull/9068#issuecomment-510307115
 
 
   @twalthr How about we cache a `catalogName -> DDLs` mapping in the 
`SessionContext`, just like we cache the `ViewEntry`, then we recover the 
tables from these DDLs every time we switch to a new session.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-13181) Add a constructor function to CsvTableSink

2019-07-10 Thread hehuiyuan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882608#comment-16882608
 ] 

hehuiyuan edited comment on FLINK-13181 at 7/11/19 2:46 AM:


Good idea. Using `Builder`  is more convenient.


was (Author: hehuiyuan):
Using `Builder`  is more convenient.

> Add a constructor function to CsvTableSink
> --
>
> Key: FLINK-13181
> URL: https://issues.apache.org/jira/browse/FLINK-13181
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: hehuiyuan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add a constructor function for parameters :
> @param path   The output path to write the Table to.
> @param fieldDelim The field delimiter
> @param writeMode  The write mode to specify whether existing files 
> are overwritten or not.
>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #9072: [FLINK-11630] Wait for the termination of all running Tasks when shutting down TaskExecutor

2019-07-10 Thread GitBox
flinkbot commented on issue #9072: [FLINK-11630] Wait for the termination of 
all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/9072#issuecomment-510306664
 
 
   CI report for commit cd5ad8d23046c1025f7f9865e60fc3d048fd1f85: SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118634580)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13181) Add a constructor function to CsvTableSink

2019-07-10 Thread hehuiyuan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882608#comment-16882608
 ] 

hehuiyuan commented on FLINK-13181:
---

Using `Builder`  is more convenient.

> Add a constructor function to CsvTableSink
> --
>
> Key: FLINK-13181
> URL: https://issues.apache.org/jira/browse/FLINK-13181
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: hehuiyuan
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add a constructor function for parameters :
> @param path   The output path to write the Table to.
> @param fieldDelim The field delimiter
> @param writeMode  The write mode to specify whether existing files 
> are overwritten or not.
>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #9076: [FLINK-13200][Table SQL / Planner] Improve the generated code for if statements

2019-07-10 Thread GitBox
flinkbot commented on issue #9076: [FLINK-13200][Table SQL / Planner] Improve 
the generated code for if statements
URL: https://github.com/apache/flink/pull/9076#issuecomment-510306052
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13200) Improve the generated code for if statements

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13200:
---
Labels: pull-request-available  (was: )

> Improve the generated code for if statements
> 
>
> Key: FLINK-13200
> URL: https://issues.apache.org/jira/browse/FLINK-13200
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>
> In the generated code, we often code snippet like this:
>  
> if (true) {
>   acc$6.setNullAt(1);
> } else {
>   acc$6.setField(1, ((int) -1));;
> }
> Such code impacts the code readability, and increases the code size, making 
> it more costly for compiling and transferring through network.
>  
> In this issue, we remove such useless if conditions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] liyafan82 opened a new pull request #9076: [FLINK-13200][Table SQL / Planner] Improve the generated code for if statements

2019-07-10 Thread GitBox
liyafan82 opened a new pull request #9076: [FLINK-13200][Table SQL / Planner] 
Improve the generated code for if statements
URL: https://github.com/apache/flink/pull/9076
 
 
   
   
   ## What is the purpose of the change
   
   In the generated code, we often code snippet like this:
   

   
   if (true){
  acc$6.setNullAt(1); 
   } else {
  acc$6.setField(1, ((int) -1));; 
   }
   
   Such code impacts the code readability, and increases the code size, making 
it more costly for compiling and transferring through network.

   
   In this issue, we remove such useless if conditions. 
   
   
   ## Brief change log
   
 - Add method CodeGenUtils#getSimplifiedIfConditionCode for processing the 
if statements
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #9074: [FLINK-13198][core] Introduce TimeLength in configuration package

2019-07-10 Thread GitBox
wuchong commented on issue #9074: [FLINK-13198][core] Introduce TimeLength in 
configuration package
URL: https://github.com/apache/flink/pull/9074#issuecomment-510304773
 
 
   cc @zentol , I think you may also want to have a look at this? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9074: [FLINK-13198][core] 
Introduce TimeLength in configuration package
URL: https://github.com/apache/flink/pull/9074#discussion_r302340591
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/configuration/TimeLengthTest.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.configuration.TimeLength.TimeUnit.MINUTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MemorySize} class.
+ */
+public class TimeLengthTest {
+
+   @Test
+   public void testUnitConversion() {
+   final TimeLength zero = new TimeLength(0);
+   assertEquals(0, zero.getMilliseconds());
+   assertEquals(0, zero.getSeconds());
+   assertEquals(0, zero.getMinutes());
+   assertEquals(0, zero.getHours());
+
+   final TimeLength ms = new TimeLength(955);
+   assertEquals(955, ms.getMilliseconds());
+   assertEquals(0, ms.getSeconds());
+   assertEquals(0, ms.getMinutes());
+   assertEquals(0, ms.getHours());
+
+   final TimeLength secs = new TimeLength(18500);
+   assertEquals(18500, secs.getMilliseconds());
+   assertEquals(18, secs.getSeconds());
+   assertEquals(0, secs.getMinutes());
+   assertEquals(0, secs.getHours());
+
+   final TimeLength mins = new TimeLength(6 + 18500);
+   assertEquals(6 + 18500, mins.getMilliseconds());
+   assertEquals(60 + 18, mins.getSeconds());
+   assertEquals(1, mins.getMinutes());
+   assertEquals(0, mins.getHours());
+
+   final TimeLength hrs = new TimeLength(3600 * 1000 + 999);
+   assertEquals(3600 * 1000 + 999, hrs.getMilliseconds());
+   assertEquals(3600, hrs.getSeconds());
+   assertEquals(60, hrs.getMinutes());
+   assertEquals(1, hrs.getHours());
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void testInvalid() {
+   new TimeLength(-1);
+   }
+
+   @Test
+   public void testStandardUtils() throws IOException {
+   final TimeLength ms = new TimeLength(1234567890L);
+   final TimeLength cloned = 
CommonTestUtils.createCopySerializable(ms);
+
+   assertEquals(ms, cloned);
+   assertEquals(ms.hashCode(), cloned.hashCode());
+   assertEquals(ms.toString(), cloned.toString());
+   }
+
+   @Test
+   public void testParseMilliseconds() {
+   assertEquals(1234, TimeLength.parse("1234").getMilliseconds());
+   assertEquals(1234, 
TimeLength.parse("1234ms").getMilliseconds());
+   assertEquals(1234, TimeLength.parse("1234 
ms").getMilliseconds());
+   }
+
+   @Test
+   public void testParseSeconds() {
+   assertEquals(667766, TimeLength.parse("667766s").getSeconds());
+   assertEquals(667766, TimeLength.parse("667766 s").getSeconds());
+   assertEquals(667766, TimeLength.parse("667766s").getSeconds());
 
 Review comment:
   The same with L94?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9074: [FLINK-13198][core] 
Introduce TimeLength in configuration package
URL: https://github.com/apache/flink/pull/9074#discussion_r302341272
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/configuration/TimeLengthTest.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.configuration.TimeLength.TimeUnit.MINUTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MemorySize} class.
+ */
+public class TimeLengthTest {
+
+   @Test
+   public void testUnitConversion() {
+   final TimeLength zero = new TimeLength(0);
+   assertEquals(0, zero.getMilliseconds());
+   assertEquals(0, zero.getSeconds());
+   assertEquals(0, zero.getMinutes());
+   assertEquals(0, zero.getHours());
+
+   final TimeLength ms = new TimeLength(955);
+   assertEquals(955, ms.getMilliseconds());
+   assertEquals(0, ms.getSeconds());
+   assertEquals(0, ms.getMinutes());
+   assertEquals(0, ms.getHours());
+
+   final TimeLength secs = new TimeLength(18500);
+   assertEquals(18500, secs.getMilliseconds());
+   assertEquals(18, secs.getSeconds());
+   assertEquals(0, secs.getMinutes());
+   assertEquals(0, secs.getHours());
+
+   final TimeLength mins = new TimeLength(6 + 18500);
+   assertEquals(6 + 18500, mins.getMilliseconds());
+   assertEquals(60 + 18, mins.getSeconds());
+   assertEquals(1, mins.getMinutes());
+   assertEquals(0, mins.getHours());
+
+   final TimeLength hrs = new TimeLength(3600 * 1000 + 999);
+   assertEquals(3600 * 1000 + 999, hrs.getMilliseconds());
+   assertEquals(3600, hrs.getSeconds());
+   assertEquals(60, hrs.getMinutes());
+   assertEquals(1, hrs.getHours());
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void testInvalid() {
+   new TimeLength(-1);
+   }
+
+   @Test
+   public void testStandardUtils() throws IOException {
+   final TimeLength ms = new TimeLength(1234567890L);
+   final TimeLength cloned = 
CommonTestUtils.createCopySerializable(ms);
+
+   assertEquals(ms, cloned);
+   assertEquals(ms.hashCode(), cloned.hashCode());
+   assertEquals(ms.toString(), cloned.toString());
+   }
+
+   @Test
+   public void testParseMilliseconds() {
+   assertEquals(1234, TimeLength.parse("1234").getMilliseconds());
+   assertEquals(1234, 
TimeLength.parse("1234ms").getMilliseconds());
+   assertEquals(1234, TimeLength.parse("1234 
ms").getMilliseconds());
+   }
+
+   @Test
+   public void testParseSeconds() {
+   assertEquals(667766, TimeLength.parse("667766s").getSeconds());
+   assertEquals(667766, TimeLength.parse("667766 s").getSeconds());
+   assertEquals(667766, TimeLength.parse("667766s").getSeconds());
+   }
+
+   @Test
+   public void testParseMinutes() {
+   assertEquals(7657623, 
TimeLength.parse("7657623min").getMinutes());
+   assertEquals(7657623, TimeLength.parse("7657623 
min").getMinutes());
+   assertEquals(7657623, 
TimeLength.parse("7657623min").getMinutes());
+   }
+
+   @Test
+   public void testPraseHours() {
+   assertEquals(987654, TimeLength.parse("987654h").getHours());
+   assertEquals(987654, TimeLength.parse("987654 h").getHours());
+   assertEquals(987654, TimeLength.parse("987654h").getHours());
 
 Review comment:
   The same with L108?
   
   

--

[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9074: [FLINK-13198][core] 
Introduce TimeLength in configuration package
URL: https://github.com/apache/flink/pull/9074#discussion_r302341200
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/configuration/TimeLengthTest.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.configuration.TimeLength.TimeUnit.MINUTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MemorySize} class.
+ */
+public class TimeLengthTest {
+
+   @Test
+   public void testUnitConversion() {
+   final TimeLength zero = new TimeLength(0);
+   assertEquals(0, zero.getMilliseconds());
+   assertEquals(0, zero.getSeconds());
+   assertEquals(0, zero.getMinutes());
+   assertEquals(0, zero.getHours());
+
+   final TimeLength ms = new TimeLength(955);
+   assertEquals(955, ms.getMilliseconds());
+   assertEquals(0, ms.getSeconds());
+   assertEquals(0, ms.getMinutes());
+   assertEquals(0, ms.getHours());
+
+   final TimeLength secs = new TimeLength(18500);
+   assertEquals(18500, secs.getMilliseconds());
+   assertEquals(18, secs.getSeconds());
+   assertEquals(0, secs.getMinutes());
+   assertEquals(0, secs.getHours());
+
+   final TimeLength mins = new TimeLength(6 + 18500);
+   assertEquals(6 + 18500, mins.getMilliseconds());
+   assertEquals(60 + 18, mins.getSeconds());
+   assertEquals(1, mins.getMinutes());
+   assertEquals(0, mins.getHours());
+
+   final TimeLength hrs = new TimeLength(3600 * 1000 + 999);
+   assertEquals(3600 * 1000 + 999, hrs.getMilliseconds());
+   assertEquals(3600, hrs.getSeconds());
+   assertEquals(60, hrs.getMinutes());
+   assertEquals(1, hrs.getHours());
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void testInvalid() {
+   new TimeLength(-1);
+   }
+
+   @Test
+   public void testStandardUtils() throws IOException {
+   final TimeLength ms = new TimeLength(1234567890L);
+   final TimeLength cloned = 
CommonTestUtils.createCopySerializable(ms);
+
+   assertEquals(ms, cloned);
+   assertEquals(ms.hashCode(), cloned.hashCode());
+   assertEquals(ms.toString(), cloned.toString());
+   }
+
+   @Test
+   public void testParseMilliseconds() {
+   assertEquals(1234, TimeLength.parse("1234").getMilliseconds());
+   assertEquals(1234, 
TimeLength.parse("1234ms").getMilliseconds());
+   assertEquals(1234, TimeLength.parse("1234 
ms").getMilliseconds());
+   }
+
+   @Test
+   public void testParseSeconds() {
+   assertEquals(667766, TimeLength.parse("667766s").getSeconds());
+   assertEquals(667766, TimeLength.parse("667766 s").getSeconds());
+   assertEquals(667766, TimeLength.parse("667766s").getSeconds());
+   }
+
+   @Test
+   public void testParseMinutes() {
+   assertEquals(7657623, 
TimeLength.parse("7657623min").getMinutes());
+   assertEquals(7657623, TimeLength.parse("7657623 
min").getMinutes());
+   assertEquals(7657623, 
TimeLength.parse("7657623min").getMinutes());
 
 Review comment:
   The same with L101?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package

2019-07-10 Thread GitBox
wuchong commented on a change in pull request #9074: [FLINK-13198][core] 
Introduce TimeLength in configuration package
URL: https://github.com/apache/flink/pull/9074#discussion_r302341162
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 ##
 @@ -540,13 +541,33 @@ public TableSchema getTableSchema(String key) {
});
}
 
+   /**
+* Returns a Flink {@link TimeLength} under the given key if it exists.
+*/
+   public Optional getOptionalTimeLength(String key) {
+   return optionalGet(key).map((value) -> {
+   try {
+   return TimeLength.parse(value, 
TimeLength.TimeUnit.MILLISECONDS);
+   } catch (Exception e) {
+   throw new ValidationException("Invalid time 
length value for key '" + key + "'.", e);
+   }
+   });
+   }
+
/**
 * Returns a Flink {@link MemorySize} under the given existing key.
 */
public MemorySize getMemorySize(String key) {
return 
getOptionalMemorySize(key).orElseThrow(exceptionSupplier(key));
}
 
+   /**
+* Returns a Flink {@link TimeLength} under the given existing key.
+*/
+   public TimeLength getTimeLength(String key) {
+   return 
getOptionalTimeLength(key).orElseThrow(exceptionSupplier(key));
+   }
+
 
 Review comment:
   Please also add `validateTimeLength` similar to `validateMemorySize`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner

2019-07-10 Thread GitBox
godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] 
Bump Calcite dependency to 1.20.0 in sql parser & flink planner
URL: https://github.com/apache/flink/pull/9056#issuecomment-510304178
 
 
   > > > Oops.. I only had validated the dependency in flink-table module. I 
will re-validate it in whole project
   > > 
   > > 
   > > Maybe also find a way to verify python table api with blink planner.
   > 
   > ok, i will fix it
   
   @dianfu told me that flink-python does not support blink planner now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9030: [FLINK-13123] Align Stop/Cancel Commands in CLI and REST Interface and Improve Documentation

2019-07-10 Thread GitBox
flinkbot commented on issue #9030: [FLINK-13123] Align Stop/Cancel Commands in 
CLI and REST Interface and Improve Documentation
URL: https://github.com/apache/flink/pull/9030#issuecomment-510303704
 
 
   CI report for commit 9a4bb16fa7220a166c29701e65c90f0b11b6dfc4: SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118626872)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink

2019-07-10 Thread GitBox
lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] 
HiveTableSink should implement OverwritableTableSink
URL: https://github.com/apache/flink/pull/9067#discussion_r302342236
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 ##
 @@ -235,10 +220,44 @@ public void testInsertIntoStaticPartition() throws 
Exception {
 
// make sure new partition is created
assertEquals(toWrite.size(), 
hiveCatalog.listPartitions(tablePath).size());
-   CatalogPartition catalogPartition = 
hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec));
 
-   String partitionLocation = 
catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
-   verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
+   verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
+
+   hiveCatalog.dropTable(tablePath, false);
+   }
+
+   @Test
+   public void testInsertOverwrite() throws Exception {
+   String dbName = "default";
+   String tblName = "dest";
+   RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
+   ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+   ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
+   BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(execEnv);
+
+   // write some data and verify
+   List toWrite = generateRecords(5);
+   tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, 
rowTypeInfo));
+
+   CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+   tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, table));
+   tableEnv.sql("select * from src").insertInto("destSink");
+   execEnv.execute();
+
+   verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
+
+   // write some data to overwrite existing data and verify
+   toWrite = generateRecords(3);
 
 Review comment:
   No. If we don't set overwrite, we'll get 8 rows instead of 3.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13200) Improve the generated code for if statements

2019-07-10 Thread Liya Fan (JIRA)
Liya Fan created FLINK-13200:


 Summary: Improve the generated code for if statements
 Key: FLINK-13200
 URL: https://issues.apache.org/jira/browse/FLINK-13200
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Liya Fan
Assignee: Liya Fan


In the generated code, we often code snippet like this:

 

if (true) {
  acc$6.setNullAt(1);
} else {
  acc$6.setField(1, ((int) -1));;
}

Such code impacts the code readability, and increases the code size, making it 
more costly for compiling and transferring through network.

 

In this issue, we remove such useless if conditions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13163) Support execution of batch jobs with fewer slots than requested

2019-07-10 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882597#comment-16882597
 ] 

Zhu Zhu commented on FLINK-13163:
-

Hi [~kkrugler], the input splits problem discussed above may result in more 
regression in one task failover, but should not increase the chance that a task 
failover happens.

To identify why increasing source parallelism can lead to more failures, I 
think we need to check the failure cause exception to see why it is happening. 
We can do it in another mail thread, since this JIRA may be not related.

> Support execution of batch jobs with fewer slots than requested
> ---
>
> Key: FLINK-13163
> URL: https://issues.apache.org/jira/browse/FLINK-13163
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Jeff Zhang
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.9.0
>
>
> Flink should be able to execute batch jobs with fewer slots than requested in 
> a sequential manner.
> At the moment, however, we register for every slot request a timeout which 
> fires after {{slot.request.timeout}} to fail the slot request. Moreover, we 
> fail the slot request if the {{ResourceManager}} fails to allocate new 
> resources or if the slot request times out on the {{ResourceManager}}. This 
> kind of behavior makes sense if we know that we need all requested slots so 
> that we fail early if it cannot be fulfilled.
> However, for batch jobs it is not strictly required that all slot requests 
> get fulfilled. It is enough to have at least one slot for every requested 
> {{ResourceProfile}} (the set of slots (available + allocated) must contain a 
> slot which can fulfill a slot request). If this is the case, then we should 
> not fail the slot request but instead wait until the slot gets assigned to 
> the request. If there is no such slot, then we should still time out the 
> request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xintongsong commented on issue #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots.

2019-07-10 Thread GitBox
xintongsong commented on issue #8846: [FLINK-12766][runtime] Dynamically 
allocate TaskExecutor's managed memory to slots.
URL: https://github.com/apache/flink/pull/8846#issuecomment-510301993
 
 
   Thank you for the comments, @StephanEwen, @KurtYoung.
   
   This truly is an imperfect temporal solution. If you guys both think we 
should not have this for 1.9, I respect your opinions. Will not insist on this 
one.
   
   I think we still need to work on #8841, to make sure that slot sharing works 
properly with fine grained resource profiles, if we decide that batch jobs need 
slot sharing to ease the memory waste.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hehuiyuan commented on a change in pull request #9054: [FLINK-13183][Table]Add a PrintTableSink

2019-07-10 Thread GitBox
hehuiyuan commented on a change in pull request #9054: [FLINK-13183][Table]Add 
a PrintTableSink
URL: https://github.com/apache/flink/pull/9054#discussion_r302340288
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/PrintTableSink.java
 ##
 @@ -0,0 +1,63 @@
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.table.utils.TableConnectorUtils;
+
+/**
+ * A simple {@link TableSink} to emit data to the standard output stream.
+ */
+public class PrintTableSink implements BatchTableSink , AppendStreamTableSink {
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   try {
+   dataSet.print();
+   } catch (Exception e) {
+   e.printStackTrace();
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12858) Potentially not properly working Flink job in case of stop-with-savepoint failure

2019-07-10 Thread yelun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yelun reassigned FLINK-12858:
-

Assignee: yelun

> Potentially not properly working Flink job in case of stop-with-savepoint 
> failure
> -
>
> Key: FLINK-12858
> URL: https://issues.apache.org/jira/browse/FLINK-12858
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Alex
>Assignee: yelun
>Priority: Minor
>
> Current implementation of stop-with-savepoint (FLINK-11458) would lock the 
> thread (on {{syncSavepointLatch}}) that carries 
> {{StreamTask.performCheckpoint()}}. For non-source tasks, this thread is 
> implied to be the task's main thread (stop-with-savepoint deliberately stops 
> any activity in the task's main thread).
> Unlocking happens either when the task is cancelled or when the corresponding 
> checkpoint is acknowledged.
> It's possible, that other downstream tasks of the same Flink job "soft" fail 
> the checkpoint/savepoint due to various reasons (for example, due to max 
> buffered bytes {{BarrierBuffer.checkSizeLimit()}}. In such case, the 
> checkpoint abortion would be notified to JM . But it looks like, the 
> checkpoint coordinator would handle such abortion as usual and assume that 
> the Flink job continues running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13150) defaultCatalogName and defaultDatabaseName in TableEnvImpl is not updated after they are updated in TableEnvironment

2019-07-10 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882587#comment-16882587
 ] 

Jeff Zhang commented on FLINK-13150:


[~xuefuz] It happens when I register table. here's the sample code to reproduce 
this issue. The root cause is that it seems when I call tEnv.useCatalog, the 
defaultCatalogName in TableEnvImpl is not changed, so that when I do register 
table, it still register to 

GenericInMemoryCatalog instead of HiveCatalog.
{code:java}
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

val hiveCatalog = new HiveCatalog("hive", "default", 
"/Users/jzhang/Java/lib/apache-hive-2.3.4-bin/conf", "2.3.4");
tEnv.registerCatalog("hive", hiveCatalog)
tEnv.useCatalog("hive")


val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))

// register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, 'word, 'frequency)

// run a SQL query on the Table and retrieve the result as a new Table
val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY 
word")

table.toDataSet[WC].print(){code}

> defaultCatalogName and defaultDatabaseName in TableEnvImpl is not updated 
> after they are updated in TableEnvironment
> 
>
> Key: FLINK-13150
> URL: https://issues.apache.org/jira/browse/FLINK-13150
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Jeff Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> defaultCatalogName and defaultDatabaseName in TableEnvImpl are initialized 
> when it is created and never changed even when they are updated in 
> TableEnvironment.
> The will cause issues that we may register table to the wrong catalog after 
> we changed the defaultCatalogName and defaultDatabaseName 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner

2019-07-10 Thread GitBox
godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] 
Bump Calcite dependency to 1.20.0 in sql parser & flink planner
URL: https://github.com/apache/flink/pull/9056#issuecomment-510297306
 
 
   > > Oops.. I only had validated the dependency in flink-table module. I will 
re-validate it in whole project
   > 
   > Maybe also find a way to verify python table api with blink planner.
   
   ok, i will fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8418: [FLINK-12491][docs][configuration] Fix incorrect javadoc for path sep…

2019-07-10 Thread GitBox
flinkbot commented on issue #8418: [FLINK-12491][docs][configuration] Fix 
incorrect javadoc for path sep…
URL: https://github.com/apache/flink/pull/8418#issuecomment-510296832
 
 
   CI report for commit df6d3f5acd084931c16fe8564cfd8532d3889111: SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/118620366)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner

2019-07-10 Thread GitBox
KurtYoung commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] 
Bump Calcite dependency to 1.20.0 in sql parser & flink planner
URL: https://github.com/apache/flink/pull/9056#issuecomment-510295851
 
 
   > Oops.. I only had validated the dependency in flink-table module. I will 
re-validate it in whole project
   
   Maybe also find a way to verify python table api with blink planner. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase

2019-07-10 Thread GitBox
flinkbot commented on issue #9075:  [FLINK-10245][hbase] Add an upsert table 
sink factory for HBase
URL: https://github.com/apache/flink/pull/9075#issuecomment-510295340
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   >