[GitHub] [flink] pnowojski merged pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
pnowojski merged pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
wuchong commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#discussion_r302388287 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sinks.UpsertStreamTableSink; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An upsert {@link UpsertStreamTableSink} for HBase. + */ +public class HBaseUpsertTableSink implements UpsertStreamTableSink { + + private final HBaseTableSchema hbaseTableSchema; + private final TableSchema tableSchema; + private final HBaseOptions hbaseOptions; + private final HBaseWriteOptions writeOptions; + + public HBaseUpsertTableSink( + HBaseTableSchema hbaseTableSchema, + HBaseOptions hbaseOptions, + HBaseWriteOptions writeOptions) { + checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), "HBaseUpsertTableSink requires rowkey is set."); + this.hbaseTableSchema = hbaseTableSchema; + this.tableSchema = hbaseTableSchema.convertsToTableSchema(); + this.hbaseOptions = hbaseOptions; + this.writeOptions = writeOptions; + } + + @Override + public void setKeyFields(String[] keys) { + // hbase always upsert on rowkey, ignore query keys. Review comment: I'm not sure. Because the support for key derivation is not very well, esp. fields concating which is used heavily in hbase sink. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
pnowojski commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#discussion_r302387828 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -177,7 +193,8 @@ public void recycle(MemorySegment segment) { final List segments = new ArrayList<>(numberOfSegmentsToRequest); try { - while (segments.size() < numberOfSegmentsToRequest) { + final Deadline deadline = Deadline.now().plus(Duration.ofMillis(requestSegmentsTimeoutInMillis)); Review comment: Let's stick with the `Deadline` as in your proposal there is already a small bug (for the timeout based on subtraction to work, you would have to subtract it only if `segment == null`, otherwise even if get the buffer after only a 1 millisecond of waiting, you subtract 2 seconds of waiting time). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9073: [FLINK-13187] Introduce ScheduleMode#LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST
flinkbot commented on issue #9073: [FLINK-13187] Introduce ScheduleMode#LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST URL: https://github.com/apache/flink/pull/9073#issuecomment-510354715 CI report for commit f588483906ba4c4459bf18671f84924a3286: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118672483) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
JingsongLi commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#issuecomment-510354243 LGTM +1, just left minor comments... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
JingsongLi commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#discussion_r302385261 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sinks.UpsertStreamTableSink; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An upsert {@link UpsertStreamTableSink} for HBase. + */ +public class HBaseUpsertTableSink implements UpsertStreamTableSink { + + private final HBaseTableSchema hbaseTableSchema; + private final TableSchema tableSchema; + private final HBaseOptions hbaseOptions; + private final HBaseWriteOptions writeOptions; + + public HBaseUpsertTableSink( + HBaseTableSchema hbaseTableSchema, + HBaseOptions hbaseOptions, + HBaseWriteOptions writeOptions) { + checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), "HBaseUpsertTableSink requires rowkey is set."); + this.hbaseTableSchema = hbaseTableSchema; + this.tableSchema = hbaseTableSchema.convertsToTableSchema(); + this.hbaseOptions = hbaseOptions; + this.writeOptions = writeOptions; + } + + @Override + public void setKeyFields(String[] keys) { + // hbase always upsert on rowkey, ignore query keys. Review comment: Check to row keys? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12749) Getting Started - Docker Playgrounds - Flink Cluster Playground
[ https://issues.apache.org/jira/browse/FLINK-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882682#comment-16882682 ] Robert Metzger commented on FLINK-12749: I'm happy to create an Apache repo for this, but we should probably first discuss this on the dev@ list. > Getting Started - Docker Playgrounds - Flink Cluster Playground > --- > > Key: FLINK-12749 > URL: https://issues.apache.org/jira/browse/FLINK-12749 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > > The planned structure for the new Getting Started Guide is > * Flink Overview (~ two pages) > * Project Setup > ** Java > ** Scala > ** Python > * Quickstarts > ** Example Walkthrough - Table API / SQL > ** Example Walkthrough - DataStream API > * Docker Playgrounds > ** Flink Cluster Playground > ** Flink Interactive SQL Playground > In this ticket we add the Flink Cluster Playground, a docker-compose based > setup consisting of Apache Kafka and Apache Flink (Flink Session Cluster), > including a step-by-step guide for some common commands (job submission, > savepoints, etc). > *Some Open Questions:* > * Which Flink images to use? `library/flink` with dynamic properties would be > the most maintainable, I think. It would be preferable, if we don't need to > host any custom images for this, but can rely on the existing plain Flink > images. > * Which Flink jobs to use? An updated version > {{org.apache.flink.streaming.examples.statemachine.StateMachineExample}} > might be a good option as it can with or without Kafka and contains a data > generator writing to Kafka already (see next questions). > * How to get data into Kafka? Maybe just provide a small bash > script/one-liner to produce into Kafka topic or see question above. > * Which Kafka Images to use? https://hub.docker.com/r/wurstmeister/kafka/ > seems to be well-maintained and is openly available. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[GitHub] [flink] JingsongLi commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
JingsongLi commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#discussion_r302370020 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Common Options for HBase. + */ +public class HBaseOptions { + + private final String tableName; + private final String zkQuorum; + @Nullable private final String zkNodeParent; + + private HBaseOptions(String tableName, String zkQuorum, String zkNodeParent) { + this.tableName = tableName; + this.zkQuorum = zkQuorum; + this.zkNodeParent = zkNodeParent; + } + + String getTableName() { Review comment: let these methods public? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13202) Unstable StandaloneResourceManagerTest
Kurt Young created FLINK-13202: -- Summary: Unstable StandaloneResourceManagerTest Key: FLINK-13202 URL: https://issues.apache.org/jira/browse/FLINK-13202 Project: Flink Issue Type: Test Components: Runtime / Coordination Affects Versions: 1.9.0 Reporter: Kurt Young [https://api.travis-ci.org/v3/job/557150195/log.txt] 06:37:02.888 [ERROR] Failures: 06:37:02.889 [ERROR] StandaloneResourceManagerTest.testStartupPeriod:60->assertHappensUntil:114 condition was not fulfilled before the deadline -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[GitHub] [flink] bowenli86 edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
bowenli86 edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#issuecomment-510350722 Hi @JingsongLi , thanks for the explanation! I think solution II works better for now compared to solution I. Solution I is too limited and may not satisfy our users' need. I've updated this PR according to your feedback, please take a look. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
bowenli86 commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#issuecomment-510350722 Hi @JingsongLi , thanks for the explanation! I think solution II works better for now compared to solution I. I've updated this PR according to your feedback, please take a look. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] chancelq commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
chancelq commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#issuecomment-510350794 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9065: [FLINK-13102][travis] Optimize some travis stages by skipping mvn verify if possible
flinkbot commented on issue #9065: [FLINK-13102][travis] Optimize some travis stages by skipping mvn verify if possible URL: https://github.com/apache/flink/pull/9065#issuecomment-510347308 CI report for commit 2e0e53b8f336838bdcca6236913790c144251941: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118642182) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.
flinkbot commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294#issuecomment-510345595 CI report for commit c51af3f0c5298e6a769c9ba84dfac18d2c2074aa: SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/118634908) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor
XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor URL: https://github.com/apache/flink/pull/9057#discussion_r302377110 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java ## @@ -42,47 +42,105 @@ @Internal public class BatchExecutor extends ExecutorBase { + private boolean enableObjectReuse; + private long latencyTrackingInterval; + private long bufferTimeout; + private TimeCharacteristic timeCharacteristic; + private InputDependencyConstraint inputDependencyConstraint; + @VisibleForTesting public BatchExecutor(StreamExecutionEnvironment executionEnvironment) { super(executionEnvironment); } @Override public JobExecutionResult execute(String jobName) throws Exception { - if (transformations.isEmpty()) { - throw new TableException("No table sinks have been created yet. " + - "A program needs at least one sink that consumes data. "); - } StreamExecutionEnvironment execEnv = getExecutionEnvironment(); - StreamGraph streamGraph = generateStreamGraph(execEnv, transformations, getNonEmptyJobName(jobName)); - - // TODO supports streamEnv.execute(streamGraph) - try { - return execEnv.execute(getNonEmptyJobName(jobName)); - } finally { - transformations.clear(); - } + StreamGraph streamGraph = generateStreamGraph(transformations, jobName); + return execEnv.execute(streamGraph); } - public static StreamGraph generateStreamGraph( - StreamExecutionEnvironment execEnv, - List> transformations, - String jobName) throws Exception { - // TODO avoid cloning ExecutionConfig - ExecutionConfig executionConfig = InstantiationUtil.clone(execEnv.getConfig()); + /** +* Backup previous streamEnv config and set batch configs. +*/ + private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) { Review comment: As batch job execution can change the properties of streamEnv to affect its reuse. Added UT. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor
XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor URL: https://github.com/apache/flink/pull/9057#discussion_r302376863 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala ## @@ -116,25 +75,40 @@ class BatchExecExchange( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .itemIf("exchange_mode", requiredExchangeMode.orNull, -requiredExchangeMode.contains(DataExchangeMode.BATCH)) + .itemIf("shuffle_mode", requiredShuffleMode.orNull, +requiredShuffleMode.contains(ShuffleMode.BATCH)) +.itemIf("full_dam_behavior", getDamBehavior, Review comment: had removed it... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor
XuPingyong commented on a change in pull request #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor URL: https://github.com/apache/flink/pull/9057#discussion_r302376778 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/ExecutionConfigOptions.java ## @@ -195,4 +195,11 @@ "means a kind of disabled operator. Its default value is empty that means no operators are disabled. " + "If the configure's value is \"NestedLoopJoin, ShuffleHashJoin\", NestedLoopJoin and ShuffleHashJoin " + "are disabled. If configure's value is \"HashJoin\", ShuffleHashJoin and BroadcastHashJoin are disabled."); + + public static final ConfigOption SQL_EXEC_SHUFFLE_MODE_ALL_BATCH = + key("sql.exec.shuffle-mode.all-batch") Review comment: ok, changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #9036: [FLINK-13112][table-planner-blink] Support LocalZonedTimestampType in blink
KurtYoung commented on a change in pull request #9036: [FLINK-13112][table-planner-blink] Support LocalZonedTimestampType in blink URL: https://github.com/apache/flink/pull/9036#discussion_r302376027 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java ## @@ -33,9 +33,9 @@ public class TableConfig { /** -* Defines the timezone for date/time/timestamp conversions. +* Defines the zone id for timestamp with local time zone. */ - private TimeZone timeZone = TimeZone.getTimeZone("UTC"); + private ZoneId localZoneId = ZoneId.systemDefault(); Review comment: cc @twalthr, we changed `TimeZone` to `LocalZoneId` to better fit with `TIMESTAMP WITH LOCAL TIME ZONE` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#issuecomment-510341121 Very thanks for the review @pnowojski @StephanEwen ! I have updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible
flinkbot commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible URL: https://github.com/apache/flink/pull/8877#issuecomment-510337436 CI report for commit f183d69a6ba8fbd3b9bb3c5474b392950811dfa8: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118634881) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung closed pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.
KurtYoung closed pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302370153 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properties.
[GitHub] [flink] KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.
KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294#issuecomment-510336618 travis passed here: https://travis-ci.org/beyond1920/flink/builds/557128403 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.
KurtYoung commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294#issuecomment-510336673 merging this... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#issuecomment-510335932 CI report for commit 31a51cbe260a78381dc44973e6724c20532b5deb: SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/118634862) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302369427 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properti
[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302368883 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properti
[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302367751 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -74,12 +118,56 @@ public static Builder builder() { return new Builder(); } + private JDBCInputFormat getInputFormat() { + JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(options.getDriverName()) + .setDBUrl(options.getDbURL()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setRowTypeInfo(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames())); + + String query = options.getDialect().getSelectFromStatement( + options.getTableName(), returnType.getFieldNames(), new String[0]); + if (scanOptions != null) { + long lowerBound = scanOptions.getPartitionLowerBound(); + long upperBound = scanOptions.getPartitionUpperBound(); + long numPartitions = scanOptions.getNumPartitions(); + // partitionSize = Math.ceil(upperBound - lowerBound + 1) / numPartitions; + // the following is equivalent + long partitionSize = (upperBound - lowerBound + numPartitions) / numPartitions; + builder = builder.setParametersProvider(new NumericBetweenParametersProvider( Review comment: Maybe you can use a new `ParameterValuesProvider` to deal with `numPartitions`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
JingsongLi commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302364645 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -55,6 +89,16 @@ public JDBCTableSource( .build(); Review comment: @TsReaper Can you add some tests to test `ProjectableTableSource`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
JingsongLi commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#issuecomment-510329508 Hi bowen: > `TableFunctionDefinition` and `AggregateFunctionDefinition` need resultType and accumulateType First, I think the design of `FunctionDefinition` should be optimized, this result type should be an implicit result type instead of real result type, because we can not able to infer the type here, so we can't get the real result type. In our internal blink we pass `GenericType` to `TableFunctionDefinition` resultType, and pass `Generic` to aggregate resultType and pass `GenericType` to aggregate accumulateType. (See `HiveGenericUDTF` and `HiveGenericUDAF`) I think it's okay to be an implicit result type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8990: [FLINK-13104][metrics] Updated request callback to log warning on failure
flinkbot commented on issue #8990: [FLINK-13104][metrics] Updated request callback to log warning on failure URL: https://github.com/apache/flink/pull/8990#issuecomment-510328396 CI report for commit f3b49c2ad8fd13e880ad5908b69c33bb841d52a5: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118701357) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8962: [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner
flinkbot commented on issue #8962: [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner URL: https://github.com/apache/flink/pull/8962#issuecomment-510327044 CI report for commit d1f5135c005d7a116c819543feb5ce07c42d8cfc: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118634816) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
flinkbot commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#issuecomment-510327018 CI report for commit 4afedee15460ac0f1f2945ca657581c538ddfc06: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118682784) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9077: [FLINK-13196][table] Fix Ambiguous column name exception bug for Table API
flinkbot commented on issue #9077: [FLINK-13196][table] Fix Ambiguous column name exception bug for Table API URL: https://github.com/apache/flink/pull/9077#issuecomment-510326720 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13196) Fix Ambiguous column name exception bug for Table API
[ https://issues.apache.org/jira/browse/FLINK-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13196: --- Labels: pull-request-available (was: ) > Fix Ambiguous column name exception bug for Table API > - > > Key: FLINK-13196 > URL: https://issues.apache.org/jira/browse/FLINK-13196 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > The following query should be valid, however, ambiguous column name exception > is thrown. > {code:java} > val util = streamTestUtil() > val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) > val resultTable = table > .groupBy('b) > .select('b, 'a.sum, 'a.sum, 'a.sum) > {code} > {code:java} > org.apache.flink.table.api.ValidationException: Ambiguous column name: EXPR$0 > at > org.apache.flink.table.operations.utils.factories.ProjectionOperationFactory.lambda$validateAndGetUniqueNames$4(ProjectionOperationFactory.java:103) > {code} > We should add some alias logic in {{AggregationAndPropertiesReplacer}} if the > name has ever been used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 opened a new pull request #9077: [FLINK-13196][table] Fix Ambiguous column name exception bug for Table API
hequn8128 opened a new pull request #9077: [FLINK-13196][table] Fix Ambiguous column name exception bug for Table API URL: https://github.com/apache/flink/pull/9077 ## What is the purpose of the change This pull request fixes Ambiguous column name exception bug for Table API. The following query should be valid, however, ambiguous column name exception is thrown. ``` val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) val resultTable = table .groupBy('b) .select('b, 'a.sum, 'a.sum, 'a.sum) ``` This pull request adds some alias logic in `AggregationAndPropertiesReplacer` if the name has ever been used. ## Brief change log - Add alias logic in `AggregationAndPropertiesReplacer` if the name has ever been used. Note we don't need to add the alias logic for aggregates and properties in a udf, there is no ambiguous column problem for this case. ## Verifying this change This change is already covered by existing tests, such as the plan tests. Also, this change added tests and can be verified as follows: - Added `testAggregateReuse()` in `AggregateTest` to test the alias logic. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302354841 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -74,12 +118,56 @@ public static Builder builder() { return new Builder(); } + private JDBCInputFormat getInputFormat() { + JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(options.getDriverName()) + .setDBUrl(options.getDbURL()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setRowTypeInfo(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames())); Review comment: The field types and field names should be projected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302354736 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -55,6 +89,16 @@ public JDBCTableSource( .build(); Review comment: The field types and field names passed into `JDBCLookupFunction` should be projected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302359849 ## File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ## @@ -95,7 +99,7 @@ public static String getCreateQuery(String tableName) { sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("price DOUBLE DEFAULT NULL,"); Review comment: Why need to change to DOUBLE? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302355917 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -18,30 +18,64 @@ package org.apache.flink.api.java.io.jdbc; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.sources.LookupableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; import static org.apache.flink.util.Preconditions.checkNotNull; /** * {@link TableSource} for JDBC. - * Now only support {@link LookupableTableSource}. */ -public class JDBCTableSource implements LookupableTableSource { +public class JDBCTableSource implements + StreamTableSource, + ProjectableTableSource, + LookupableTableSource { private final JDBCOptions options; + private final JDBCScanOptions scanOptions; Review comment: Mark `@Nullable` annotation on the members which maybe null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302358796 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properties.
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302358581 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; Review comment: `connector.lookup.cache.max-rows`? rows may be more easy to understand than entries? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302358649 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; Review comment: `connector.write.flush.max-rows` ? Align with cache.max-rows This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302355599 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -74,12 +118,56 @@ public static Builder builder() { return new Builder(); } + private JDBCInputFormat getInputFormat() { + JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(options.getDriverName()) + .setDBUrl(options.getDbURL()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setRowTypeInfo(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames())); + + String query = options.getDialect().getSelectFromStatement( + options.getTableName(), returnType.getFieldNames(), new String[0]); + if (scanOptions != null) { + long lowerBound = scanOptions.getPartitionLowerBound(); + long upperBound = scanOptions.getPartitionUpperBound(); + long numPartitions = scanOptions.getNumPartitions(); + // partitionSize = Math.ceil(upperBound - lowerBound + 1) / numPartitions; + // the following is equivalent Review comment: If they are equal, why not use `Math.ceil` ? And it seems that they are not equal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302357869 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; Review comment: I prefer to use `connector.username` which is more clear. I think we don't need to align with the parameter name in jdbc url. [HIVE](https://cwiki.apache.org/confluence/display/Hive/JdbcStorageHandler) also uses `username`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302356136 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; Review comment: Move all of them to `JDBCValidator` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302357572 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; Review comment: I prefer use `connector.table-name` or `connector.table`, because dbtable looks like we also need db name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302359068 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properties.
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302359017 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properties.
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302355652 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java ## @@ -74,12 +118,56 @@ public static Builder builder() { return new Builder(); } + private JDBCInputFormat getInputFormat() { + JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(options.getDriverName()) + .setDBUrl(options.getDbURL()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setRowTypeInfo(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames())); + + String query = options.getDialect().getSelectFromStatement( + options.getTableName(), returnType.getFieldNames(), new String[0]); + if (scanOptions != null) { + long lowerBound = scanOptions.getPartitionLowerBound(); + long upperBound = scanOptions.getPartitionUpperBound(); + long numPartitions = scanOptions.getNumPartitions(); + // partitionSize = Math.ceil(upperBound - lowerBound + 1) / numPartitions; + // the following is equivalent + long partitionSize = (upperBound - lowerBound + numPartitions) / numPartitions; + builder = builder.setParametersProvider(new NumericBetweenParametersProvider( + partitionSize, lowerBound, upperBound)); + query += " WHERE " + scanOptions.getPartitionColumnName() + " >= ? " + " AND " + Review comment: Quotes missed around the column names. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302358314 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; Review comment: I would like to rename "scan" to "read" to align with "write". How about to use: ``` connector.read.partition.column connector.read.partition.lower-bound connector.read.partition.upper-bound connector.read.partition.num ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302355984 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; Review comment: Unused import This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302360021 ## File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryITCase.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.runtime.utils.StreamITCase; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created + * by {@link JDBCTableSourceSinkFactory}. + */ +public class JDBCTableSourceSinkFactoryITCase extends JDBCTestBase { + + @Test + public void testJDBCScanSource() throws Exception { + testJDBCScanSourceImpl(false); + } + + @Test + public void testJDBCScanSourceWithParallelism() throws Exception { + testJDBCScanSourceImpl(true); + } + + private void testJDBCScanSourceImpl(boolean useParallelism) throws Exception { + Map properties = getBasicProperties(INPUT_TABLE); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + if (useParallelism) { + properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_PARTITION_COLUMN, "id"); + properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_LOWER_BOUND, TEST_DATA[0].id.toString()); + properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_UPPER_BOUND, TEST_DATA[TEST_DATA.length - 1].id.toString()); + properties.put(JDBCTableSourceSinkFactory.CONNECTOR_SCAN_NUM_PARTITIONS, "5"); + + env.setParallelism(TEST_DATA.length / 2 - 1); + } + + final StreamTableSource source = TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + + DataStream resultSet = tEnv.toAppendStream(tEnv.fromTableSource(source), Row.class); + resultSet.addSink(new StreamITCase.StringSink<>()); + env.execute(); + + List expected = new ArrayList<>(); + for (TestEntry entry : TEST_DATA) { + expected.add(entry.id + "," + entry.title + "," + entry.author + "," + entry.price + "," + entry.qty); + } + StreamITCase.compareWithList(expected); + StreamITCase.clear(); + } + + @Test + public void testJDBCScanSourceWithProjection() throws Exception { + Map properties = getBasicProperties(INPUT_TABLE); + + final StreamTableSource source = TableFactoryService.find(StreamTableSourceFactory.class, properties) + .createStreamTableSource(properties); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnviron
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302360647 ## File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryITCase.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.runtime.utils.StreamITCase; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created + * by {@link JDBCTableSourceSinkFactory}. + */ +public class JDBCTableSourceSinkFactoryITCase extends JDBCTestBase { Review comment: TableFactory are verified using UT. We want to avoid to introduce IT cases as much as possible. You can refer `KafkaTableSourceSinkFactoryTest` as an example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source
wuchong commented on a change in pull request #9029: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source URL: https://github.com/apache/flink/pull/9029#discussion_r302356473 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect; +import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.TimeLength; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}. + */ +public class JDBCTableSourceSinkFactory implements + StreamTableSourceFactory, + StreamTableSinkFactory> { + + public static final String CONNECTOR_URL = "connector.url"; + public static final String CONNECTOR_DBTABLE = "connector.dbtable"; + public static final String CONNECTOR_DRIVER = "connector.driver"; + public static final String CONNECTOR_USER = "connector.user"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + + public static final String CONNECTOR_SCAN_PARTITION_COLUMN = "connector.scan.partition-column"; + public static final String CONNECTOR_SCAN_LOWER_BOUND = "connector.scan.lower-bound"; + public static final String CONNECTOR_SCAN_UPPER_BOUND = "connector.scan.upper-bound"; + public static final String CONNECTOR_SCAN_NUM_PARTITIONS = "connector.scan.num-partitions"; + + public static final String CONNECTOR_LOOKUP_CACHE_MAX_ENTRIES = "connector.lookup.cache.max-entries"; + public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; + public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; + + public static final String CONNECTOR_WRITE_FLUSH_MAX_ENTRIES = "connector.write.flush.max-size"; + public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; + public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, "jdbc"); // jdbc + context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + + // common options + properties.add(CONNECTOR_DRIVER); + properties.add(CONNECTOR_URL); + properties.add(CONNECTOR_DBTABLE); + properties.add(CONNECTOR_USER); + properties.add(CONNECTOR_PASSWORD); + + // scan partition options + properties.
[GitHub] [flink] flinkbot commented on issue #9021: [hostfix][runtime] Make checkpoints injection ordered with stop-with-savepoint
flinkbot commented on issue #9021: [hostfix][runtime] Make checkpoints injection ordered with stop-with-savepoint URL: https://github.com/apache/flink/pull/9021#issuecomment-510323564 CI report for commit abed4b5678a2f09b3bb729bd62b5264e56b55b9f: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118634788) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
xuefuz commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#discussion_r302357374 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; + +/** + * A factory to create {@link FunctionDefinition} based on string-based properties. + * See also {@link TableFactory} for more information. + */ +public interface FunctionDefinitionFactory extends TableFactory { + + /** +* Creates a {@link FunctionDefinition} from given {@link CatalogFunction}. +* +* @param name name of the {@link CatalogFunction} +* @param catalogFunction the catalog function +* @param constantArguments arguments of a function call (only literal arguments +* are passed, nulls for non-literal ones) +* @param argTypes types of arguments +* @return a {@link FunctionDefinition} +*/ + FunctionDefinition createFunctionDefinition( + String name, CatalogFunction catalogFunction, Object[] constantArguments, DataType[] argTypes); Review comment: Currently HiveSource/Sink need database name and table name to access Hive metastore directly to fetch additional information needed to read/write Hive table, which is why HiveTableFactory provides ObjectPath. Note that CatalogTable itself doesn't carry over this info. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9047: [FLINK-13109][docs-zh]Translate "Restart Strategies" page into Chinese
flinkbot commented on issue #9047: [FLINK-13109][docs-zh]Translate "Restart Strategies" page into Chinese URL: https://github.com/apache/flink/pull/9047#issuecomment-510321288 CI report for commit 4eeb4fcf1480bd43e12be51879bc5b4ce901b5e9: SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/118634731) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#discussion_r302356552 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -186,6 +203,22 @@ public void recycle(MemorySegment segment) { if (segment != null) { segments.add(segment); } + + if (segments.size() >= numberOfSegmentsToRequest) { + break; + } + + if (!deadline.hasTimeLeft()) { + throw new IOException(String.format("Insufficient number of network buffers: " + Review comment: Very thanks for the comments, I preferred to the second method since the timeout should need a different exception message. I have extracted the construction and changed the exception message for requesting timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#discussion_r302354310 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -186,6 +203,22 @@ public void recycle(MemorySegment segment) { if (segment != null) { segments.add(segment); } + Review comment: Agree with that the method seems too long and I have put this part into separated `tryRedistributeBuffers` method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#discussion_r302353092 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -177,7 +193,8 @@ public void recycle(MemorySegment segment) { final List segments = new ArrayList<>(numberOfSegmentsToRequest); try { - while (segments.size() < numberOfSegmentsToRequest) { + final Deadline deadline = Deadline.now().plus(Duration.ofMillis(requestSegmentsTimeoutInMillis)); Review comment: Very thanks for the tips and changed to `fromNow` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#discussion_r302353455 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -73,17 +75,31 @@ private final int numberOfSegmentsToRequest; + private final long requestSegmentsTimeoutInMillis; + + public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, int numberOfSegmentsToRequest) { + this(numberOfSegmentsToAllocate, segmentSize, numberOfSegmentsToRequest, Integer.MAX_VALUE); + } + /** * Allocates all {@link MemorySegment} instances managed by this pool. */ - public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, int numberOfSegmentsToRequest) { + public NetworkBufferPool( + int numberOfSegmentsToAllocate, + int segmentSize, + int numberOfSegmentsToRequest, + long requestSegmentsTimeoutInMillis) { Review comment: Agree with that an encapsulated object should be more clear and changed the related variables to Duration in `NettyShuffleEnvironmentConfiguration`, `NetworkBufferPool` and `NettyShuffleEnvironmentBuilder`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner
flinkbot commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner URL: https://github.com/apache/flink/pull/9056#issuecomment-510317452 CI report for commit e73503e4d0c3a07cc440bff8b0d62eefcb4834ec: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118634699) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii commented on a change in pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#discussion_r302353092 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -177,7 +193,8 @@ public void recycle(MemorySegment segment) { final List segments = new ArrayList<>(numberOfSegmentsToRequest); try { - while (segments.size() < numberOfSegmentsToRequest) { + final Deadline deadline = Deadline.now().plus(Duration.ofMillis(requestSegmentsTimeoutInMillis)); Review comment: Changed to `fromNow` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13201) Unstable sql time udf test
Kurt Young created FLINK-13201: -- Summary: Unstable sql time udf test Key: FLINK-13201 URL: https://issues.apache.org/jira/browse/FLINK-13201 Project: Flink Issue Type: Test Components: Table SQL / Planner Affects Versions: 1.9.0 Reporter: Kurt Young Assignee: Jingsong Lee org.apache.flink.table.runtime.batch.sql.CalcITCase:testTimeUDF will occasionally fail when running all scala tests through IDE. The output is: {code:java} java.lang.AssertionError: Results do not match for query: SELECT dateFunc(a), localDateFunc(a), dateFunc(b), localDateFunc(b), timeFunc(c), localTimeFunc(c), timeFunc(d), localTimeFunc(d), timestampFunc(e), datetimeFunc(e), timestampFunc(f), datetimeFunc(f) FROM MyTable Results == Correct Result - 1 == == Actual Result - 1 == !1984-07-12,1984-07-12,1984-07-12,1984-07-12,08:03:09,08:03:09,08:03:09,08:03:09,2019-09-19 08:03:09.0,2019-09-19T08:03:09,2019-09-19 08:03:09.0,2019-09-19T08:03:09 1984-07-11,1984-07-12,1984-07-11,1984-07-12,00:03:09,08:03:09,08:03:09,16:03:09,2019-09-19 00:03:09.0,2019-09-19T08:03:09,2019-09-19 08:03:09.0,2019-09-19T16:03:09 Plan: == Abstract Syntax Tree == LogicalProject(EXPR$0=[dateFunc($0)], EXPR$1=[localDateFunc($0)], EXPR$2=[dateFunc($1)], EXPR$3=[localDateFunc($1)], EXPR$4=[timeFunc($2)], EXPR$5=[localTimeFunc($2)], EXPR$6=[timeFunc($3)], EXPR$7=[localTimeFunc($3)], EXPR$8=[timestampFunc($4)], EXPR$9=[datetimeFunc($4)], EXPR$10=[timestampFunc($5)], EXPR$11=[datetimeFunc($5)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) == Optimized Logical Plan == Calc(select=[dateFunc(a) AS EXPR$0, localDateFunc(a) AS EXPR$1, dateFunc(b) AS EXPR$2, localDateFunc(b) AS EXPR$3, timeFunc(c) AS EXPR$4, localTimeFunc(c) AS EXPR$5, timeFunc(d) AS EXPR$6, localTimeFunc(d) AS EXPR$7, timestampFunc(e) AS EXPR$8, datetimeFunc(e) AS EXPR$9, timestampFunc(f) AS EXPR$10, datetimeFunc(f) AS EXPR$11]) +- BoundedStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, f]) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #9068: [FLINK-13195] Add create table support for SqlClient
flinkbot commented on issue #9068: [FLINK-13195] Add create table support for SqlClient URL: https://github.com/apache/flink/pull/9068#issuecomment-510314838 CI report for commit 829e8aef5c2ef6c2263998f00cf73448ff4a518c: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118634619) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
lirui-apache commented on issue #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink URL: https://github.com/apache/flink/pull/9067#issuecomment-510313753 Updated to address comments. @xuefuz @bowenli86 please take a look, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor
flinkbot commented on issue #9057: [FLINK-13121] [table-planner-blink] Set batch properties to runtime in blink batch executor URL: https://github.com/apache/flink/pull/9057#issuecomment-510313483 CI report for commit b616282cb875778a7a5af22a2783eaaf48104908: FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/118634660) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
bowenli86 commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink URL: https://github.com/apache/flink/pull/9067#discussion_r302349765 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java ## @@ -235,10 +220,44 @@ public void testInsertIntoStaticPartition() throws Exception { // make sure new partition is created assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size()); - CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec)); - String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION); - verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1); + verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); + + hiveCatalog.dropTable(tablePath, false); + } + + @Test + public void testInsertOverwrite() throws Exception { + String dbName = "default"; + String tblName = "dest"; + RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0); + ObjectPath tablePath = new ObjectPath(dbName, tblName); + + ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); + BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); + + // write some data and verify + List toWrite = generateRecords(5); + tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo)); + + CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath); + tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table)); + tableEnv.sql("select * from src").insertInto("destSink"); + execEnv.execute(); + + verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); + + // write some data to overwrite existing data and verify + toWrite = generateRecords(3); Review comment: yeah, right, my bad... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #9039: [FLINK-13170][table-planner] Planner should get table factory from ca…
godfreyhe commented on issue #9039: [FLINK-13170][table-planner] Planner should get table factory from ca… URL: https://github.com/apache/flink/pull/9039#issuecomment-510311997 thanks for this pr @lirui-apache. `StreamPlanner` in flink-table-planner module should also be updated, just as `PlannerBase` in flink-table-planner-blink module. and add some tests for also for blink planner? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink URL: https://github.com/apache/flink/pull/9067#discussion_r302348621 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -198,6 +199,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE tablePath = (ObjectPath) in.readObject(); partitionToWriter = new HashMap<>(); tableProperties = (Properties) in.readObject(); + hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion()); Review comment: `open` is called on TM but not on JM, where `finalizeGlobal` is performed. That's why I initialize hiveVersion in `readObject` and constructor (to be safe). I'll serialize this field instead of getting it from jobConf. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lincoln-lil commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
lincoln-lil commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#discussion_r302347191 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The upsert sink for HBase. + * + * This class leverage {@link BufferedMutator} to buffer multiple + * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster. + * The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes}, + * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}. + */ +public class HBaseUpsertSinkFunction + extends RichSinkFunction> + implements CheckpointedFunction, BufferedMutator.ExceptionListener { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(HBaseUpsertSinkFunction.class); + + private final String hTableName; + private final HBaseTableSchema schema; + private final byte[] serializedConfig; + + private final long bufferFlushMaxSizeInBytes; + private final long bufferFlushMaxMutations; + private final long bufferFlushIntervalMillis; + + private transient HBaseReadWriteHelper helper; + + private transient Connection connection; + private transient BufferedMutator mutator; + + private transient ScheduledExecutorService executor; + private transient ScheduledFuture scheduledFuture; + private transient AtomicLong numPendingRequests; + + private transient volatile boolean closed = false; + + /** +* This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable} +* was thrown. +* +* Errors will be checked and rethrown before processing each input element, and when the sink is closed. +*/ + private final AtomicReference failureThrowable = new AtomicReference<>(); + + public HBaseUpsertSinkFunction( + String hTableName, + HBaseTableSchema schema, + org.apache.hadoop.conf.Configuration conf, + long bufferFlushMaxSizeInBytes, + long bufferFlushMaxMutations, +
[GitHub] [flink] lincoln-lil commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
lincoln-lil commented on a change in pull request #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#discussion_r302347476 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The upsert sink for HBase. + * + * This class leverage {@link BufferedMutator} to buffer multiple + * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster. + * The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes}, + * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}. + */ +public class HBaseUpsertSinkFunction + extends RichSinkFunction> + implements CheckpointedFunction, BufferedMutator.ExceptionListener { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(HBaseUpsertSinkFunction.class); + + private final String hTableName; + private final HBaseTableSchema schema; + private final byte[] serializedConfig; + + private final long bufferFlushMaxSizeInBytes; + private final long bufferFlushMaxMutations; + private final long bufferFlushIntervalMillis; + + private transient HBaseReadWriteHelper helper; + + private transient Connection connection; + private transient BufferedMutator mutator; + + private transient ScheduledExecutorService executor; + private transient ScheduledFuture scheduledFuture; + private transient AtomicLong numPendingRequests; + + private transient volatile boolean closed = false; + + /** +* This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable} +* was thrown. +* +* Errors will be checked and rethrown before processing each input element, and when the sink is closed. +*/ + private final AtomicReference failureThrowable = new AtomicReference<>(); + + public HBaseUpsertSinkFunction( + String hTableName, + HBaseTableSchema schema, + org.apache.hadoop.conf.Configuration conf, + long bufferFlushMaxSizeInBytes, + long bufferFlushMaxMutations, +
[GitHub] [flink] zjuwangg commented on issue #9037: [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest
zjuwangg commented on issue #9037: [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest URL: https://github.com/apache/flink/pull/9037#issuecomment-510309575 > > The latest changes look good to me. It would be good if we can revert unnecessary changes to the indention. (This might be taken care at commit time. ( > > Yes, as @xuefuz mentioned, please revert any unnecessary changes in the future. I reverted them for you this time. > > Ran tests for Hive 2.3.4 and 1.2.1 locally. Merging Thanks, I will take care of in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.
wuchong commented on issue #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294#issuecomment-510307375 How about changing `addParameters` to `addConfigs` ? Maybe config is more appropriate than parameter here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on issue #9068: [FLINK-13195] Add create table support for SqlClient
danny0405 commented on issue #9068: [FLINK-13195] Add create table support for SqlClient URL: https://github.com/apache/flink/pull/9068#issuecomment-510307115 @twalthr How about we cache a `catalogName -> DDLs` mapping in the `SessionContext`, just like we cache the `ViewEntry`, then we recover the tables from these DDLs every time we switch to a new session. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-13181) Add a constructor function to CsvTableSink
[ https://issues.apache.org/jira/browse/FLINK-13181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882608#comment-16882608 ] hehuiyuan edited comment on FLINK-13181 at 7/11/19 2:46 AM: Good idea. Using `Builder` is more convenient. was (Author: hehuiyuan): Using `Builder` is more convenient. > Add a constructor function to CsvTableSink > -- > > Key: FLINK-13181 > URL: https://issues.apache.org/jira/browse/FLINK-13181 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: hehuiyuan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Add a constructor function for parameters : > @param path The output path to write the Table to. > @param fieldDelim The field delimiter > @param writeMode The write mode to specify whether existing files > are overwritten or not. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #9072: [FLINK-11630] Wait for the termination of all running Tasks when shutting down TaskExecutor
flinkbot commented on issue #9072: [FLINK-11630] Wait for the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/9072#issuecomment-510306664 CI report for commit cd5ad8d23046c1025f7f9865e60fc3d048fd1f85: SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/118634580) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13181) Add a constructor function to CsvTableSink
[ https://issues.apache.org/jira/browse/FLINK-13181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882608#comment-16882608 ] hehuiyuan commented on FLINK-13181: --- Using `Builder` is more convenient. > Add a constructor function to CsvTableSink > -- > > Key: FLINK-13181 > URL: https://issues.apache.org/jira/browse/FLINK-13181 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: hehuiyuan >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Add a constructor function for parameters : > @param path The output path to write the Table to. > @param fieldDelim The field delimiter > @param writeMode The write mode to specify whether existing files > are overwritten or not. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #9076: [FLINK-13200][Table SQL / Planner] Improve the generated code for if statements
flinkbot commented on issue #9076: [FLINK-13200][Table SQL / Planner] Improve the generated code for if statements URL: https://github.com/apache/flink/pull/9076#issuecomment-510306052 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13200) Improve the generated code for if statements
[ https://issues.apache.org/jira/browse/FLINK-13200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13200: --- Labels: pull-request-available (was: ) > Improve the generated code for if statements > > > Key: FLINK-13200 > URL: https://issues.apache.org/jira/browse/FLINK-13200 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > > In the generated code, we often code snippet like this: > > if (true) { > acc$6.setNullAt(1); > } else { > acc$6.setField(1, ((int) -1));; > } > Such code impacts the code readability, and increases the code size, making > it more costly for compiling and transferring through network. > > In this issue, we remove such useless if conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] liyafan82 opened a new pull request #9076: [FLINK-13200][Table SQL / Planner] Improve the generated code for if statements
liyafan82 opened a new pull request #9076: [FLINK-13200][Table SQL / Planner] Improve the generated code for if statements URL: https://github.com/apache/flink/pull/9076 ## What is the purpose of the change In the generated code, we often code snippet like this: if (true){ acc$6.setNullAt(1); } else { acc$6.setField(1, ((int) -1));; } Such code impacts the code readability, and increases the code size, making it more costly for compiling and transferring through network. In this issue, we remove such useless if conditions. ## Brief change log - Add method CodeGenUtils#getSimplifiedIfConditionCode for processing the if statements ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #9074: [FLINK-13198][core] Introduce TimeLength in configuration package
wuchong commented on issue #9074: [FLINK-13198][core] Introduce TimeLength in configuration package URL: https://github.com/apache/flink/pull/9074#issuecomment-510304773 cc @zentol , I think you may also want to have a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package
wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package URL: https://github.com/apache/flink/pull/9074#discussion_r302340591 ## File path: flink-core/src/test/java/org/apache/flink/configuration/TimeLengthTest.java ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.flink.configuration.TimeLength.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link MemorySize} class. + */ +public class TimeLengthTest { + + @Test + public void testUnitConversion() { + final TimeLength zero = new TimeLength(0); + assertEquals(0, zero.getMilliseconds()); + assertEquals(0, zero.getSeconds()); + assertEquals(0, zero.getMinutes()); + assertEquals(0, zero.getHours()); + + final TimeLength ms = new TimeLength(955); + assertEquals(955, ms.getMilliseconds()); + assertEquals(0, ms.getSeconds()); + assertEquals(0, ms.getMinutes()); + assertEquals(0, ms.getHours()); + + final TimeLength secs = new TimeLength(18500); + assertEquals(18500, secs.getMilliseconds()); + assertEquals(18, secs.getSeconds()); + assertEquals(0, secs.getMinutes()); + assertEquals(0, secs.getHours()); + + final TimeLength mins = new TimeLength(6 + 18500); + assertEquals(6 + 18500, mins.getMilliseconds()); + assertEquals(60 + 18, mins.getSeconds()); + assertEquals(1, mins.getMinutes()); + assertEquals(0, mins.getHours()); + + final TimeLength hrs = new TimeLength(3600 * 1000 + 999); + assertEquals(3600 * 1000 + 999, hrs.getMilliseconds()); + assertEquals(3600, hrs.getSeconds()); + assertEquals(60, hrs.getMinutes()); + assertEquals(1, hrs.getHours()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalid() { + new TimeLength(-1); + } + + @Test + public void testStandardUtils() throws IOException { + final TimeLength ms = new TimeLength(1234567890L); + final TimeLength cloned = CommonTestUtils.createCopySerializable(ms); + + assertEquals(ms, cloned); + assertEquals(ms.hashCode(), cloned.hashCode()); + assertEquals(ms.toString(), cloned.toString()); + } + + @Test + public void testParseMilliseconds() { + assertEquals(1234, TimeLength.parse("1234").getMilliseconds()); + assertEquals(1234, TimeLength.parse("1234ms").getMilliseconds()); + assertEquals(1234, TimeLength.parse("1234 ms").getMilliseconds()); + } + + @Test + public void testParseSeconds() { + assertEquals(667766, TimeLength.parse("667766s").getSeconds()); + assertEquals(667766, TimeLength.parse("667766 s").getSeconds()); + assertEquals(667766, TimeLength.parse("667766s").getSeconds()); Review comment: The same with L94? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package
wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package URL: https://github.com/apache/flink/pull/9074#discussion_r302341272 ## File path: flink-core/src/test/java/org/apache/flink/configuration/TimeLengthTest.java ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.flink.configuration.TimeLength.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link MemorySize} class. + */ +public class TimeLengthTest { + + @Test + public void testUnitConversion() { + final TimeLength zero = new TimeLength(0); + assertEquals(0, zero.getMilliseconds()); + assertEquals(0, zero.getSeconds()); + assertEquals(0, zero.getMinutes()); + assertEquals(0, zero.getHours()); + + final TimeLength ms = new TimeLength(955); + assertEquals(955, ms.getMilliseconds()); + assertEquals(0, ms.getSeconds()); + assertEquals(0, ms.getMinutes()); + assertEquals(0, ms.getHours()); + + final TimeLength secs = new TimeLength(18500); + assertEquals(18500, secs.getMilliseconds()); + assertEquals(18, secs.getSeconds()); + assertEquals(0, secs.getMinutes()); + assertEquals(0, secs.getHours()); + + final TimeLength mins = new TimeLength(6 + 18500); + assertEquals(6 + 18500, mins.getMilliseconds()); + assertEquals(60 + 18, mins.getSeconds()); + assertEquals(1, mins.getMinutes()); + assertEquals(0, mins.getHours()); + + final TimeLength hrs = new TimeLength(3600 * 1000 + 999); + assertEquals(3600 * 1000 + 999, hrs.getMilliseconds()); + assertEquals(3600, hrs.getSeconds()); + assertEquals(60, hrs.getMinutes()); + assertEquals(1, hrs.getHours()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalid() { + new TimeLength(-1); + } + + @Test + public void testStandardUtils() throws IOException { + final TimeLength ms = new TimeLength(1234567890L); + final TimeLength cloned = CommonTestUtils.createCopySerializable(ms); + + assertEquals(ms, cloned); + assertEquals(ms.hashCode(), cloned.hashCode()); + assertEquals(ms.toString(), cloned.toString()); + } + + @Test + public void testParseMilliseconds() { + assertEquals(1234, TimeLength.parse("1234").getMilliseconds()); + assertEquals(1234, TimeLength.parse("1234ms").getMilliseconds()); + assertEquals(1234, TimeLength.parse("1234 ms").getMilliseconds()); + } + + @Test + public void testParseSeconds() { + assertEquals(667766, TimeLength.parse("667766s").getSeconds()); + assertEquals(667766, TimeLength.parse("667766 s").getSeconds()); + assertEquals(667766, TimeLength.parse("667766s").getSeconds()); + } + + @Test + public void testParseMinutes() { + assertEquals(7657623, TimeLength.parse("7657623min").getMinutes()); + assertEquals(7657623, TimeLength.parse("7657623 min").getMinutes()); + assertEquals(7657623, TimeLength.parse("7657623min").getMinutes()); + } + + @Test + public void testPraseHours() { + assertEquals(987654, TimeLength.parse("987654h").getHours()); + assertEquals(987654, TimeLength.parse("987654 h").getHours()); + assertEquals(987654, TimeLength.parse("987654h").getHours()); Review comment: The same with L108? --
[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package
wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package URL: https://github.com/apache/flink/pull/9074#discussion_r302341200 ## File path: flink-core/src/test/java/org/apache/flink/configuration/TimeLengthTest.java ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.flink.configuration.TimeLength.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link MemorySize} class. + */ +public class TimeLengthTest { + + @Test + public void testUnitConversion() { + final TimeLength zero = new TimeLength(0); + assertEquals(0, zero.getMilliseconds()); + assertEquals(0, zero.getSeconds()); + assertEquals(0, zero.getMinutes()); + assertEquals(0, zero.getHours()); + + final TimeLength ms = new TimeLength(955); + assertEquals(955, ms.getMilliseconds()); + assertEquals(0, ms.getSeconds()); + assertEquals(0, ms.getMinutes()); + assertEquals(0, ms.getHours()); + + final TimeLength secs = new TimeLength(18500); + assertEquals(18500, secs.getMilliseconds()); + assertEquals(18, secs.getSeconds()); + assertEquals(0, secs.getMinutes()); + assertEquals(0, secs.getHours()); + + final TimeLength mins = new TimeLength(6 + 18500); + assertEquals(6 + 18500, mins.getMilliseconds()); + assertEquals(60 + 18, mins.getSeconds()); + assertEquals(1, mins.getMinutes()); + assertEquals(0, mins.getHours()); + + final TimeLength hrs = new TimeLength(3600 * 1000 + 999); + assertEquals(3600 * 1000 + 999, hrs.getMilliseconds()); + assertEquals(3600, hrs.getSeconds()); + assertEquals(60, hrs.getMinutes()); + assertEquals(1, hrs.getHours()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalid() { + new TimeLength(-1); + } + + @Test + public void testStandardUtils() throws IOException { + final TimeLength ms = new TimeLength(1234567890L); + final TimeLength cloned = CommonTestUtils.createCopySerializable(ms); + + assertEquals(ms, cloned); + assertEquals(ms.hashCode(), cloned.hashCode()); + assertEquals(ms.toString(), cloned.toString()); + } + + @Test + public void testParseMilliseconds() { + assertEquals(1234, TimeLength.parse("1234").getMilliseconds()); + assertEquals(1234, TimeLength.parse("1234ms").getMilliseconds()); + assertEquals(1234, TimeLength.parse("1234 ms").getMilliseconds()); + } + + @Test + public void testParseSeconds() { + assertEquals(667766, TimeLength.parse("667766s").getSeconds()); + assertEquals(667766, TimeLength.parse("667766 s").getSeconds()); + assertEquals(667766, TimeLength.parse("667766s").getSeconds()); + } + + @Test + public void testParseMinutes() { + assertEquals(7657623, TimeLength.parse("7657623min").getMinutes()); + assertEquals(7657623, TimeLength.parse("7657623 min").getMinutes()); + assertEquals(7657623, TimeLength.parse("7657623min").getMinutes()); Review comment: The same with L101? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package
wuchong commented on a change in pull request #9074: [FLINK-13198][core] Introduce TimeLength in configuration package URL: https://github.com/apache/flink/pull/9074#discussion_r302341162 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ## @@ -540,13 +541,33 @@ public TableSchema getTableSchema(String key) { }); } + /** +* Returns a Flink {@link TimeLength} under the given key if it exists. +*/ + public Optional getOptionalTimeLength(String key) { + return optionalGet(key).map((value) -> { + try { + return TimeLength.parse(value, TimeLength.TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new ValidationException("Invalid time length value for key '" + key + "'.", e); + } + }); + } + /** * Returns a Flink {@link MemorySize} under the given existing key. */ public MemorySize getMemorySize(String key) { return getOptionalMemorySize(key).orElseThrow(exceptionSupplier(key)); } + /** +* Returns a Flink {@link TimeLength} under the given existing key. +*/ + public TimeLength getTimeLength(String key) { + return getOptionalTimeLength(key).orElseThrow(exceptionSupplier(key)); + } + Review comment: Please also add `validateTimeLength` similar to `validateMemorySize`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner
godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner URL: https://github.com/apache/flink/pull/9056#issuecomment-510304178 > > > Oops.. I only had validated the dependency in flink-table module. I will re-validate it in whole project > > > > > > Maybe also find a way to verify python table api with blink planner. > > ok, i will fix it @dianfu told me that flink-python does not support blink planner now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9030: [FLINK-13123] Align Stop/Cancel Commands in CLI and REST Interface and Improve Documentation
flinkbot commented on issue #9030: [FLINK-13123] Align Stop/Cancel Commands in CLI and REST Interface and Improve Documentation URL: https://github.com/apache/flink/pull/9030#issuecomment-510303704 CI report for commit 9a4bb16fa7220a166c29701e65c90f0b11b6dfc4: SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/118626872) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
lirui-apache commented on a change in pull request #9067: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink URL: https://github.com/apache/flink/pull/9067#discussion_r302342236 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java ## @@ -235,10 +220,44 @@ public void testInsertIntoStaticPartition() throws Exception { // make sure new partition is created assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size()); - CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec)); - String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION); - verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1); + verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); + + hiveCatalog.dropTable(tablePath, false); + } + + @Test + public void testInsertOverwrite() throws Exception { + String dbName = "default"; + String tblName = "dest"; + RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0); + ObjectPath tablePath = new ObjectPath(dbName, tblName); + + ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); + BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); + + // write some data and verify + List toWrite = generateRecords(5); + tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo)); + + CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath); + tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table)); + tableEnv.sql("select * from src").insertInto("destSink"); + execEnv.execute(); + + verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); + + // write some data to overwrite existing data and verify + toWrite = generateRecords(3); Review comment: No. If we don't set overwrite, we'll get 8 rows instead of 3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13200) Improve the generated code for if statements
Liya Fan created FLINK-13200: Summary: Improve the generated code for if statements Key: FLINK-13200 URL: https://issues.apache.org/jira/browse/FLINK-13200 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Liya Fan Assignee: Liya Fan In the generated code, we often code snippet like this: if (true) { acc$6.setNullAt(1); } else { acc$6.setField(1, ((int) -1));; } Such code impacts the code readability, and increases the code size, making it more costly for compiling and transferring through network. In this issue, we remove such useless if conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13163) Support execution of batch jobs with fewer slots than requested
[ https://issues.apache.org/jira/browse/FLINK-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882597#comment-16882597 ] Zhu Zhu commented on FLINK-13163: - Hi [~kkrugler], the input splits problem discussed above may result in more regression in one task failover, but should not increase the chance that a task failover happens. To identify why increasing source parallelism can lead to more failures, I think we need to check the failure cause exception to see why it is happening. We can do it in another mail thread, since this JIRA may be not related. > Support execution of batch jobs with fewer slots than requested > --- > > Key: FLINK-13163 > URL: https://issues.apache.org/jira/browse/FLINK-13163 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Jeff Zhang >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.9.0 > > > Flink should be able to execute batch jobs with fewer slots than requested in > a sequential manner. > At the moment, however, we register for every slot request a timeout which > fires after {{slot.request.timeout}} to fail the slot request. Moreover, we > fail the slot request if the {{ResourceManager}} fails to allocate new > resources or if the slot request times out on the {{ResourceManager}}. This > kind of behavior makes sense if we know that we need all requested slots so > that we fail early if it cannot be fulfilled. > However, for batch jobs it is not strictly required that all slot requests > get fulfilled. It is enough to have at least one slot for every requested > {{ResourceProfile}} (the set of slots (available + allocated) must contain a > slot which can fulfill a slot request). If this is the case, then we should > not fail the slot request but instead wait until the slot gets assigned to > the request. If there is no such slot, then we should still time out the > request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xintongsong commented on issue #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots.
xintongsong commented on issue #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots. URL: https://github.com/apache/flink/pull/8846#issuecomment-510301993 Thank you for the comments, @StephanEwen, @KurtYoung. This truly is an imperfect temporal solution. If you guys both think we should not have this for 1.9, I respect your opinions. Will not insist on this one. I think we still need to work on #8841, to make sure that slot sharing works properly with fine grained resource profiles, if we decide that batch jobs need slot sharing to ease the memory waste. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hehuiyuan commented on a change in pull request #9054: [FLINK-13183][Table]Add a PrintTableSink
hehuiyuan commented on a change in pull request #9054: [FLINK-13183][Table]Add a PrintTableSink URL: https://github.com/apache/flink/pull/9054#discussion_r302340288 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/PrintTableSink.java ## @@ -0,0 +1,63 @@ +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.table.utils.TableConnectorUtils; + +/** + * A simple {@link TableSink} to emit data to the standard output stream. + */ +public class PrintTableSink implements BatchTableSink , AppendStreamTableSink { + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + @Override + public void emitDataSet(DataSet dataSet) { + try { + dataSet.print(); + } catch (Exception e) { + e.printStackTrace(); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12858) Potentially not properly working Flink job in case of stop-with-savepoint failure
[ https://issues.apache.org/jira/browse/FLINK-12858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yelun reassigned FLINK-12858: - Assignee: yelun > Potentially not properly working Flink job in case of stop-with-savepoint > failure > - > > Key: FLINK-12858 > URL: https://issues.apache.org/jira/browse/FLINK-12858 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Alex >Assignee: yelun >Priority: Minor > > Current implementation of stop-with-savepoint (FLINK-11458) would lock the > thread (on {{syncSavepointLatch}}) that carries > {{StreamTask.performCheckpoint()}}. For non-source tasks, this thread is > implied to be the task's main thread (stop-with-savepoint deliberately stops > any activity in the task's main thread). > Unlocking happens either when the task is cancelled or when the corresponding > checkpoint is acknowledged. > It's possible, that other downstream tasks of the same Flink job "soft" fail > the checkpoint/savepoint due to various reasons (for example, due to max > buffered bytes {{BarrierBuffer.checkSizeLimit()}}. In such case, the > checkpoint abortion would be notified to JM . But it looks like, the > checkpoint coordinator would handle such abortion as usual and assume that > the Flink job continues running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13150) defaultCatalogName and defaultDatabaseName in TableEnvImpl is not updated after they are updated in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882587#comment-16882587 ] Jeff Zhang commented on FLINK-13150: [~xuefuz] It happens when I register table. here's the sample code to reproduce this issue. The root cause is that it seems when I call tEnv.useCatalog, the defaultCatalogName in TableEnvImpl is not changed, so that when I do register table, it still register to GenericInMemoryCatalog instead of HiveCatalog. {code:java} // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) val hiveCatalog = new HiveCatalog("hive", "default", "/Users/jzhang/Java/lib/apache-hive-2.3.4-bin/conf", "2.3.4"); tEnv.registerCatalog("hive", hiveCatalog) tEnv.useCatalog("hive") val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) // register the DataSet as table "WordCount" tEnv.registerDataSet("WordCount", input, 'word, 'frequency) // run a SQL query on the Table and retrieve the result as a new Table val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") table.toDataSet[WC].print(){code} > defaultCatalogName and defaultDatabaseName in TableEnvImpl is not updated > after they are updated in TableEnvironment > > > Key: FLINK-13150 > URL: https://issues.apache.org/jira/browse/FLINK-13150 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Jeff Zhang >Assignee: Xuefu Zhang >Priority: Major > > defaultCatalogName and defaultDatabaseName in TableEnvImpl are initialized > when it is created and never changed even when they are updated in > TableEnvironment. > The will cause issues that we may register table to the wrong catalog after > we changed the defaultCatalogName and defaultDatabaseName -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner
godfreyhe commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner URL: https://github.com/apache/flink/pull/9056#issuecomment-510297306 > > Oops.. I only had validated the dependency in flink-table module. I will re-validate it in whole project > > Maybe also find a way to verify python table api with blink planner. ok, i will fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8418: [FLINK-12491][docs][configuration] Fix incorrect javadoc for path sep…
flinkbot commented on issue #8418: [FLINK-12491][docs][configuration] Fix incorrect javadoc for path sep… URL: https://github.com/apache/flink/pull/8418#issuecomment-510296832 CI report for commit df6d3f5acd084931c16fe8564cfd8532d3889111: SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/118620366) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner
KurtYoung commented on issue #9056: [FLINK-13185] [sql-parser][table-planner] Bump Calcite dependency to 1.20.0 in sql parser & flink planner URL: https://github.com/apache/flink/pull/9056#issuecomment-510295851 > Oops.. I only had validated the dependency in flink-table module. I will re-validate it in whole project Maybe also find a way to verify python table api with blink planner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase
flinkbot commented on issue #9075: [FLINK-10245][hbase] Add an upsert table sink factory for HBase URL: https://github.com/apache/flink/pull/9075#issuecomment-510295340 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services