[flink] 03/03: [FLINK-12978][table] Support LookupableTableSource for CsvTableSource
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c5ccbfef31af1db7eebd18f275a4f47dab65c855 Author: Jark Wu AuthorDate: Wed Jul 3 10:37:45 2019 +0800 [FLINK-12978][table] Support LookupableTableSource for CsvTableSource --- .../apache/flink/table/sources/CsvTableSource.java | 123 - .../runtime/batch/sql/TableSourceITCase.scala | 31 +- .../runtime/stream/sql/TableSourceITCase.scala | 30 + .../apache/flink/table/util/testTableSources.scala | 47 +++- 4 files changed, 228 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java index 160bc9a..1cd49ae 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java @@ -24,15 +24,23 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.io.RowCsvInputFormat; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.IntStream; @@ -41,7 +49,8 @@ import java.util.stream.IntStream; * (logically) unlimited number of fields. */ public class CsvTableSource - implements StreamTableSource, BatchTableSource, ProjectableTableSource { + implements StreamTableSource, BatchTableSource, LookupableTableSource, + ProjectableTableSource { private final CsvInputFormatConfig config; @@ -178,6 +187,21 @@ public class CsvTableSource } @Override + public TableFunction getLookupFunction(String[] lookupKeys) { + return new CsvLookupFunction(config, lookupKeys); + } + + @Override + public AsyncTableFunction getAsyncLookupFunction(String[] lookupKeys) { + throw new UnsupportedOperationException("CSV do not support async lookup"); + } + + @Override + public boolean isAsyncEnabled() { + return false; + } + + @Override public String explainSource() { String[] fields = config.getSelectedFieldNames(); return "CsvTableSource(read fields: " + String.join(", ", fields) + ")"; @@ -321,6 +345,103 @@ public class CsvTableSource } + // + // private utilities + // + + /** +* LookupFunction to support lookup in CsvTableSource. +*/ + public static class CsvLookupFunction extends TableFunction { + private static final long serialVersionUID = 1L; + + private final CsvInputFormatConfig config; + + private final List sourceKeys = new ArrayList<>(); + private final List targetKeys = new ArrayList<>(); + private final Map> dataMap = new HashMap<>(); + + CsvLookupFunction(CsvInputFormatConfig config, String[] lookupKeys) { + this.config = config; + + List fields = Arrays.asList(config.getSelectedFieldNames()); + for (int i = 0; i < lookupKeys.length; i++) { + sourceKeys.add(i); + int targetIdx = fields.indexOf(lookupKeys[i]); + assert targetIdx != -1; + targetKeys.add(targetIdx); + } + } + + @Override + public TypeInformation getResultType() { + return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames()); + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context);
[flink] branch master updated (1123ded -> c5ccbfe)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1123ded [FLINK-13070][table-planner-blink] Remove TableImpl from blink planner and use api.internal.TableImpl instead new 635cd55 [FLINK-12977][table] Port CsvTableSource to api-java-bridge new 981a54d [FLINK-12977][table] Port CsvTableSink to api-java-bridge new c5ccbfe [FLINK-12978][table] Support LookupableTableSource for CsvTableSource The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: flink-python/pyflink/table/sinks.py| 14 +- .../org/apache/flink/table/sinks/CsvTableSink.java | 187 +++ .../apache/flink/table/sources/CsvTableSource.java | 552 + .../flink/table/api/StreamTableEnvironment.scala | 2 +- .../runtime/batch/sql/TableSourceITCase.scala | 53 +- .../runtime/stream/sql/TableSourceITCase.scala | 54 +- .../apache/flink/table/util/testTableSources.scala | 90 .../apache/flink/table/sinks/CsvTableSink.scala| 140 -- .../flink/table/sources/CsvTableSource.scala | 364 -- .../runtime/batch/table/TableSinkITCase.scala | 2 +- .../runtime/stream/sql/TableSourceITCase.scala | 2 +- .../flink/table/runtime/utils/CommonTestData.scala | 25 +- 12 files changed, 951 insertions(+), 534 deletions(-) create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
[flink] 02/03: [FLINK-12977][table] Port CsvTableSink to api-java-bridge
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 981a54d6718753ca5b2cd68259b72b4ecea9008a Author: Gen Luo AuthorDate: Wed Jun 26 11:46:53 2019 +0800 [FLINK-12977][table] Port CsvTableSink to api-java-bridge --- flink-python/pyflink/table/sinks.py| 14 +- .../org/apache/flink/table/sinks/CsvTableSink.java | 187 + .../apache/flink/table/sinks/CsvTableSink.scala| 140 --- .../runtime/batch/table/TableSinkITCase.scala | 2 +- 4 files changed, 193 insertions(+), 150 deletions(-) diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py index 4aa968f..deb5b45 100644 --- a/flink-python/pyflink/table/sinks.py +++ b/flink-python/pyflink/table/sinks.py @@ -57,24 +57,20 @@ class CsvTableSink(TableSink): and :data:`WriteMode.OVERWRITE`. """ -def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1, +def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1, write_mode=None): # type: (list[str], list[DataType], str, str, int, int) -> None gateway = get_gateway() if write_mode == WriteMode.NO_OVERWRITE: -j_write_mode = gateway.jvm.scala.Option.apply( - gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE) +j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE elif write_mode == WriteMode.OVERWRITE: -j_write_mode = gateway.jvm.scala.Option.apply( - gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE) +j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE elif write_mode is None: -j_write_mode = gateway.jvm.scala.Option.empty() +j_write_mode = None else: raise Exception('Unsupported write_mode: %s' % write_mode) -j_some_field_delimiter = gateway.jvm.scala.Option.apply(field_delimiter) -j_some_num_files = gateway.jvm.scala.Option.apply(num_files) j_csv_table_sink = gateway.jvm.CsvTableSink( -path, j_some_field_delimiter, j_some_num_files, j_write_mode) +path, field_delimiter, num_files, j_write_mode) j_field_names = utils.to_jarray(gateway.jvm.String, field_names) j_field_types = utils.to_jarray(gateway.jvm.TypeInformation, [_to_java_type(field_type) for field_type in field_types]) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java new file mode 100644 index 000..f950f4d --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple {@link TableSink} to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames;
[flink] 01/03: [FLINK-12977][table] Port CsvTableSource to api-java-bridge
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 635cd554e94e2c87858785f7e1227aa804eedc5e Author: Gen Luo AuthorDate: Tue Jun 25 19:40:48 2019 +0800 [FLINK-12977][table] Port CsvTableSource to api-java-bridge This closes #8872 --- .../apache/flink/table/sources/CsvTableSource.java | 431 + .../flink/table/api/StreamTableEnvironment.scala | 2 +- .../runtime/batch/sql/TableSourceITCase.scala | 22 +- .../runtime/stream/sql/TableSourceITCase.scala | 24 +- .../apache/flink/table/util/testTableSources.scala | 45 +++ .../flink/table/sources/CsvTableSource.scala | 364 - .../runtime/stream/sql/TableSourceITCase.scala | 2 +- .../flink/table/runtime/utils/CommonTestData.scala | 25 +- 8 files changed, 532 insertions(+), 383 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java new file mode 100644 index 000..160bc9a --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource + implements StreamTableSource, BatchTableSource, ProjectableTableSource { + + private final CsvInputFormatConfig config; + + /** +* A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with +* a (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +*/ + public CsvTableSource(String path, String[] fieldNames, TypeInformation[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** +* A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with +* a (logically) unlimited number of fields. +* +* @param pathThe path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +* @param fieldDelim The field delimiter, "," by default. +* @param lineDelim The row delimiter, "\n" by default. +* @param quoteCharacter An optional quote character for String values, null by default. +* @param ignoreFirstLine Flag to ignore the first line, false by default. +* @param ignoreComments An optional prefix to indicate comments, null by default. +* @param lenient Flag to skip records with parse error instead to fail, false by +*default. +*/ +
[flink] branch master updated: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 36a938a [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode 36a938a is described below commit 36a938a45d5db46bb9ec4234fbdde6758a422143 Author: Aitozi <1059789...@qq.com> AuthorDate: Thu May 16 00:07:55 2019 +0800 [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode --- docs/monitoring/metrics.md | 10 + docs/monitoring/metrics.zh.md | 10 + .../io/network/NettyShuffleEnvironment.java| 3 +- ...geGauge.java => AbstractBuffersUsageGauge.java} | 29 +- .../metrics/CreditBasedInputBuffersUsageGauge.java | 51 .../metrics/ExclusiveBuffersUsageGauge.java| 57 .../network/metrics/FloatingBuffersUsageGauge.java | 61 + .../network/metrics/InputBufferPoolUsageGauge.java | 35 ++- .../network/metrics/NettyShuffleMetricFactory.java | 24 +- .../partition/consumer/RemoteInputChannel.java | 8 + .../consumer/InputBuffersMetricsTest.java | 292 + 11 files changed, 545 insertions(+), 35 deletions(-) diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 86dc121..211ccfa 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1045,6 +1045,16 @@ Thus, in order to infer the metric identifier: Gauge + inputFloatingBuffersUsage + An estimate of the floating input buffers usage, dediciated for credit-based mode. + Gauge + + + inputExclusiveBuffersUsage + An estimate of the exclusive input buffers usage, dediciated for credit-based mode. + Gauge + + outPoolUsage An estimate of the output buffers usage. Gauge diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md index aaed606..44b4806 100644 --- a/docs/monitoring/metrics.zh.md +++ b/docs/monitoring/metrics.zh.md @@ -1044,6 +1044,16 @@ Thus, in order to infer the metric identifier: Gauge + inputFloatingBuffersUsage + An estimate of the floating input buffers usage, dediciated for credit-based mode. + Gauge + + + inputExclusiveBuffersUsage + An estimate of the exclusive input buffers usage, dediciated for credit-based mode. + Gauge + + outPoolUsage An estimate of the output buffers usage. Gauge diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 17fb2cc..bbe0833 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -226,7 +226,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment { +public abstract class AbstractBuffersUsageGauge implements Gauge { - private final SingleInputGate[] inputGates; + protected final SingleInputGate[] inputGates; - public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) { + @VisibleForTesting + public abstract int calculateUsedBuffers(SingleInputGate inputGate); + + @VisibleForTesting + public abstract int calculateTotalBuffers(SingleInputGate inputGate); + + AbstractBuffersUsageGauge(SingleInputGate[] inputGates) { this.inputGates = inputGates; } @Override public Float getValue() { int usedBuffers = 0; - int bufferPoolSize = 0; + int totalBuffers = 0; for (SingleInputGate inputGate : inputGates) { - BufferPool bufferPool = inputGate.getBufferPool(); - if (bufferPool != null) { - usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers(); - bufferPoolSize += bufferPool.getNumBuffers(); - } + usedBuffers += calculateUsedBuffers(inputGate); + totalBuffers += calculateTotalBuffers(inputGate); } - if (bufferPoolSize != 0) { - return ((float) usedBuffers) / bufferPoolSize; + if (totalBuffers != 0) { + return ((float) usedBuffers) / totalBuffers; } else { return 0.0f; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
[flink] 01/02: [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d1b22b56a92eef26998555dc35371447593afc8c Author: JingsongLi AuthorDate: Tue Jul 2 09:56:38 2019 +0800 [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit --- .../flink/table/calcite/FlinkRelBuilder.scala | 4 ++-- .../codegen/agg/AggsHandlerCodeGenerator.scala | 14 ++-- .../agg/batch/HashWindowCodeGenerator.scala| 4 ++-- .../agg/batch/SortWindowCodeGenerator.scala| 4 ++-- .../codegen/agg/batch/WindowCodeGenerator.scala| 4 ++-- ...nce.scala => ExestingFieldFieldReference.scala} | 2 +- ...perties.scala => plannerWindowProperties.scala} | 24 - .../flink/table/plan/logical/groupWindows.scala| 8 +++ .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 4 ++-- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 4 ++-- .../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 4 ++-- .../nodes/calcite/LogicalWindowAggregate.scala | 8 +++ .../table/plan/nodes/calcite/WindowAggregate.scala | 6 +++--- .../logical/FlinkLogicalWindowAggregate.scala | 4 ++-- .../batch/BatchExecHashWindowAggregate.scala | 4 ++-- .../batch/BatchExecHashWindowAggregateBase.scala | 4 ++-- .../batch/BatchExecLocalHashWindowAggregate.scala | 4 ++-- .../batch/BatchExecLocalSortWindowAggregate.scala | 4 ++-- .../batch/BatchExecSortWindowAggregate.scala | 4 ++-- .../batch/BatchExecSortWindowAggregateBase.scala | 4 ++-- .../batch/BatchExecWindowAggregateBase.scala | 6 +++--- .../stream/StreamExecGroupWindowAggregate.scala| 6 +++--- .../logical/LogicalWindowAggregateRuleBase.scala | 8 +++ .../plan/rules/logical/WindowPropertiesRule.scala | 20 ++--- .../flink/table/plan/util/AggregateUtil.scala | 20 - .../flink/table/plan/util/FlinkRelMdUtil.scala | 4 ++-- .../flink/table/plan/util/RelExplainUtil.scala | 4 ++-- .../flink/table/sources/TableSourceUtil.scala | 25 +++--- .../table/sources/tsextractors/ExistingField.scala | 4 ++-- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 16 +++--- 30 files changed, 120 insertions(+), 111 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala index 4683424..8b5cf8a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.calcite import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory} -import org.apache.flink.table.expressions.WindowProperty +import org.apache.flink.table.expressions.PlannerWindowProperty import org.apache.flink.table.operations.QueryOperation import org.apache.flink.table.plan.QueryOperationConverter import org.apache.flink.table.runtime.rank.{RankRange, RankType} @@ -111,7 +111,7 @@ object FlinkRelBuilder { * * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]]. */ - case class NamedWindowProperty(name: String, property: WindowProperty) + case class PlannerNamedWindowProperty(name: String, property: PlannerWindowProperty) def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() { def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala index f77810b..bf40b04 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala @@ -58,7 +58,7 @@ class AggsHandlerCodeGenerator( /** window properties like window_start and window_end, only used in window aggregates */ private var namespaceClassName: String = _ - private var windowProperties: Seq[WindowProperty] = Seq() + private var windowProperties: Seq[PlannerWindowProperty] = Seq() private var hasNamespace: Boolean = false /** Aggregates informations */ @@ -182,7 +182,7 @@ class AggsHandlerCodeGenerator( * Adds window properties such as window_start, window_end */ private def initialWindowProperties( - windowProperties: Seq[WindowProperty], + windowProperties:
[flink] branch master updated (d81ac48 -> 886b01d)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d81ac48 [hotfix][docs] remove duplicate `to` in state doc new d1b22b5 [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit new 886b01d [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../expressions/PlannerTypeInferenceUtilImpl.java | 142 .../table/functions/sql/FlinkSqlOperatorTable.java | 1 + .../table/api/ExpressionParserException.scala} | 0 .../flink/table/calcite/FlinkRelBuilder.scala | 4 +- .../flink/table/calcite/FlinkTypeFactory.scala | 21 +- .../codegen/agg/AggsHandlerCodeGenerator.scala | 14 +- .../agg/batch/HashWindowCodeGenerator.scala| 4 +- .../agg/batch/SortWindowCodeGenerator.scala| 4 +- .../codegen/agg/batch/WindowCodeGenerator.scala| 4 +- ...nce.scala => ExestingFieldFieldReference.scala} | 2 +- .../flink/table/expressions/ExpressionBridge.scala | 0 .../flink/table/expressions/InputTypeSpec.scala| 0 .../table/expressions/PlannerExpression.scala | 0 .../expressions/PlannerExpressionConverter.scala | 836 + .../expressions/PlannerExpressionParserImpl.scala | 726 ++ .../table/expressions/PlannerExpressionUtils.scala | 0 .../flink/table/expressions/aggregations.scala | 439 +++ .../flink/table/expressions/arithmetic.scala | 165 .../org/apache/flink/table/expressions/call.scala | 326 .../org/apache/flink/table/expressions/cast.scala | 59 ++ .../flink/table/expressions/collection.scala | 235 ++ .../flink/table/expressions/comparison.scala | 242 ++ .../apache/flink/table/expressions/composite.scala | 0 .../flink/table/expressions/fieldExpression.scala | 253 +++ .../flink/table/expressions/hashExpressions.scala | 124 +++ .../apache/flink/table/expressions/literals.scala | 139 .../org/apache/flink/table/expressions/logic.scala | 109 +++ .../flink/table/expressions/mathExpressions.scala | 532 + .../apache/flink/table/expressions/ordering.scala | 0 .../flink/table/expressions/overOffsets.scala | 0 .../apache/flink/table/expressions/package.scala | 0 ...perties.scala => plannerWindowProperties.scala} | 24 +- .../table/expressions/stringExpressions.scala | 585 ++ .../apache/flink/table/expressions/subquery.scala | 95 +++ .../apache/flink/table/expressions/symbols.scala | 0 .../org/apache/flink/table/expressions/time.scala | 369 + .../flink/table/expressions/windowProperties.scala | 73 +- .../functions/utils/UserDefinedFunctionUtils.scala | 8 + .../org/apache/flink/table/plan/TreeNode.scala | 115 +++ .../flink/table/plan/logical/groupWindows.scala| 8 +- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 4 +- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 4 +- .../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 4 +- .../nodes/calcite/LogicalWindowAggregate.scala | 8 +- .../table/plan/nodes/calcite/WindowAggregate.scala | 6 +- .../logical/FlinkLogicalWindowAggregate.scala | 4 +- .../batch/BatchExecHashWindowAggregate.scala | 4 +- .../batch/BatchExecHashWindowAggregateBase.scala | 4 +- .../batch/BatchExecLocalHashWindowAggregate.scala | 4 +- .../batch/BatchExecLocalSortWindowAggregate.scala | 4 +- .../batch/BatchExecSortWindowAggregate.scala | 4 +- .../batch/BatchExecSortWindowAggregateBase.scala | 4 +- .../batch/BatchExecWindowAggregateBase.scala | 6 +- .../stream/StreamExecGroupWindowAggregate.scala| 6 +- .../logical/LogicalWindowAggregateRuleBase.scala | 8 +- .../plan/rules/logical/WindowPropertiesRule.scala | 20 +- .../flink/table/plan/util/AggregateUtil.scala | 20 +- .../flink/table/plan/util/FlinkRelMdUtil.scala | 4 +- .../flink/table/plan/util/RelExplainUtil.scala | 4 +- .../flink/table/plan/util/RexNodeExtractor.scala | 9 +- .../flink/table/sources/TableSourceUtil.scala | 25 +- .../table/sources/tsextractors/ExistingField.scala | 4 +- .../flink/table/typeutils/TypeInfoCheckUtils.scala | 277 +++ .../flink/table/validate/ValidationResult.scala| 0 .../flink/table/expressions/KeywordParseTest.scala | 62 ++ .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 16 +- .../table/plan/util/RexNodeExtractorTest.scala | 195 ++--- 67 files changed, 6114 insertions(+), 254 deletions(-) create mode 100644
[flink] branch master updated (886b01d -> 1123ded)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 886b01d [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner add 1123ded [FLINK-13070][table-planner-blink] Remove TableImpl from blink planner and use api.internal.TableImpl instead No new revisions were added by this update. Summary of changes: .../apache/flink/table/api/TableEnvironment.scala | 35 +++- .../org/apache/flink/table/api/TableImpl.scala | 212 - .../table/api/java/StreamTableEnvironment.scala| 4 +- .../table/api/scala/StreamTableEnvironment.scala | 6 +- .../flink/table/api/scala/TableConversions.scala | 7 +- .../plan/stream/sql/join/WindowJoinTest.scala | 2 +- .../table/runtime/utils/BatchTableEnvUtil.scala| 4 +- .../flink/table/runtime/utils/BatchTestBase.scala | 3 +- .../table/runtime/utils/StreamTableEnvUtil.scala | 4 +- .../flink/table/runtime/utils/TableUtil.scala | 5 +- .../apache/flink/table/util/TableTestBase.scala| 6 +- 11 files changed, 50 insertions(+), 238 deletions(-) delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
[flink-web] 01/03: Add 1.8.0 to list of past releases
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 8f20040fdff9730de4c7a15aeb86cd118c7ac77e Author: Aljoscha Krettek AuthorDate: Wed Jul 3 09:34:25 2019 +0200 Add 1.8.0 to list of past releases --- downloads.md | 1 + 1 file changed, 1 insertion(+) diff --git a/downloads.md b/downloads.md index 340e291..bcf19f6 100644 --- a/downloads.md +++ b/downloads.md @@ -177,6 +177,7 @@ All Flink releases are available via [https://archive.apache.org/dist/flink/](ht ### Flink - Flink 1.8.1 - 2019-07-02 ([Source](https://archive.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz), [Binaries](https://archive.apache.org/dist/flink/flink-1.8.1/), [Docs]({{site.DOCS_BASE_URL}}flink-docs-release-1.8/), [Javadocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.8/api/java), [ScalaDocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.8/api/scala/index.html)) +- Flink 1.8.0 - 2019-04-09 ([Source](https://archive.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-src.tgz), [Binaries](https://archive.apache.org/dist/flink/flink-1.8.0/), [Docs]({{site.DOCS_BASE_URL}}flink-docs-release-1.8/), [Javadocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.8/api/java), [ScalaDocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.8/api/scala/index.html)) - Flink 1.7.2 - 2019-02-15 ([Source](https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-src.tgz), [Binaries](https://archive.apache.org/dist/flink/flink-1.7.2/), [Docs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/), [Javadocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/api/java), [ScalaDocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/api/scala/index.html)) - Flink 1.7.1 - 2018-12-21 ([Source](https://archive.apache.org/dist/flink/flink-1.7.1/flink-1.7.1-src.tgz), [Binaries](https://archive.apache.org/dist/flink/flink-1.7.1/), [Docs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/), [Javadocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/api/java), [ScalaDocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/api/scala/index.html)) - Flink 1.7.0 - 2018-11-30 ([Source](https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-src.tgz), [Binaries](https://archive.apache.org/dist/flink/flink-1.7.0/), [Docs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/), [Javadocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/api/java), [ScalaDocs]({{site.DOCS_BASE_URL}}flink-docs-release-1.7/api/scala/index.html))
[flink-web] 03/03: Rebuild website
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 703ffbe4cd98b8a5a06149fd2470c7e930f12715 Author: Aljoscha Krettek AuthorDate: Wed Jul 3 09:35:40 2019 +0200 Rebuild website --- content/downloads.html | 1 + 1 file changed, 1 insertion(+) diff --git a/content/downloads.html b/content/downloads.html index 32d8750..d3b94e2 100644 --- a/content/downloads.html +++ b/content/downloads.html @@ -401,6 +401,7 @@ main Flink release: Flink Flink 1.8.1 - 2019-07-02 (https://archive.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz;>Source, https://archive.apache.org/dist/flink/flink-1.8.1/;>Binaries, https://ci.apache.org/projects/flink/flink-docs-release-1.8/;>Docs, https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java;>Javadocs, https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/scala/index.html;>ScalaDocs) + Flink 1.8.0 - 2019-04-09 (https://archive.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-src.tgz;>Source, https://archive.apache.org/dist/flink/flink-1.8.0/;>Binaries, https://ci.apache.org/projects/flink/flink-docs-release-1.8/;>Docs, https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java;>Javadocs, https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/scala/index.html;>ScalaDocs) Flink 1.7.2 - 2019-02-15 (https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-src.tgz;>Source, https://archive.apache.org/dist/flink/flink-1.7.2/;>Binaries, https://ci.apache.org/projects/flink/flink-docs-release-1.7/;>Docs, https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java;>Javadocs, https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/scala/index.html;>ScalaDocs) Flink 1.7.1 - 2018-12-21 (https://archive.apache.org/dist/flink/flink-1.7.1/flink-1.7.1-src.tgz;>Source, https://archive.apache.org/dist/flink/flink-1.7.1/;>Binaries, https://ci.apache.org/projects/flink/flink-docs-release-1.7/;>Docs, https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java;>Javadocs, https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/scala/index.html;>ScalaDocs) Flink 1.7.0 - 2018-11-30 (https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-src.tgz;>Source, https://archive.apache.org/dist/flink/flink-1.7.0/;>Binaries, https://ci.apache.org/projects/flink/flink-docs-release-1.7/;>Docs, https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java;>Javadocs, https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/scala/index.html;>ScalaDocs)
[flink-web] 02/03: Remove .jekyll-metadata, it was added by mistake
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit c957af5d332ca3abbbe36f86154af58b51931bf2 Author: Aljoscha Krettek AuthorDate: Wed Jul 3 09:35:08 2019 +0200 Remove .jekyll-metadata, it was added by mistake --- .jekyll-metadata | Bin 92656 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/.jekyll-metadata b/.jekyll-metadata deleted file mode 100644 index 74b3891..000 Binary files a/.jekyll-metadata and /dev/null differ
[flink-web] branch asf-site updated (947ce88 -> 703ffbe)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 947ce88 Rebuild website new 8f20040 Add 1.8.0 to list of past releases new c957af5 Remove .jekyll-metadata, it was added by mistake new 703ffbe Rebuild website The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .jekyll-metadata | Bin 92656 -> 0 bytes content/downloads.html | 1 + downloads.md | 1 + 3 files changed, 2 insertions(+) delete mode 100644 .jekyll-metadata
[flink] branch master updated: [FLINK-13077][python] Fix the failed test in CatalogPartitionAPICompletenessTests caused by the lack of "getComment" method. (#8968)
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new bd5ca42 [FLINK-13077][python] Fix the failed test in CatalogPartitionAPICompletenessTests caused by the lack of "getComment" method. (#8968) bd5ca42 is described below commit bd5ca420f752a16e9e81a4eda5cc4e23bfad1679 Author: WeiZhong94 <44194288+weizhon...@users.noreply.github.com> AuthorDate: Wed Jul 3 19:40:50 2019 +0800 [FLINK-13077][python] Fix the failed test in CatalogPartitionAPICompletenessTests caused by the lack of "getComment" method. (#8968) This close #8968 --- flink-python/pyflink/table/catalog.py | 9 + 1 file changed, 9 insertions(+) diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 2748d77..6c6480c 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -736,6 +736,15 @@ class CatalogPartition(object): else: return None +def get_comment(self): +""" +Get comment of the partition. + +:return: Comment of the partition. +:rtype: str +""" +return self._j_catalog_partition.getComment() + class CatalogFunction(object): """
[flink] branch master updated: [hotfix][43w5] Fix logging argument
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7cfdcd5 [hotfix][43w5] Fix logging argument 7cfdcd5 is described below commit 7cfdcd50ed2a43679d794494cf2594727b76bcbb Author: leesf <490081...@qq.com> AuthorDate: Wed Jul 3 21:21:40 2019 +0800 [hotfix][43w5] Fix logging argument --- .../src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index 486fcff..d94e367 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -451,7 +451,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync { static void createUploadDir(final Path uploadDir, final Logger log, final boolean initialCreation) throws IOException { if (!Files.exists(uploadDir)) { if (initialCreation) { - log.info("Upload directory {} does not exist. " + uploadDir); + log.info("Upload directory {} does not exist. ", uploadDir); } else { log.warn("Upload directory {} has been deleted externally. " + "Previously uploaded files are no longer available.", uploadDir);
[flink] branch master updated: [hotfix][table] remove @PublicEvolving annotation from AbstractCatalog as it's not supported to be public
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2b1a125 [hotfix][table] remove @PublicEvolving annotation from AbstractCatalog as it's not supported to be public 2b1a125 is described below commit 2b1a125ea7e95e7eae247064cdb4b55f1bd46118 Author: bowen.li AuthorDate: Wed Jul 3 11:05:22 2019 -0700 [hotfix][table] remove @PublicEvolving annotation from AbstractCatalog as it's not supported to be public This PR removes @PublicEvolving annotation from AbstractCatalog. @PublicEvolving should be on the Catalog interface (which it already is), and marking AbstractCatalog as @PublicEvolving brings us extra burden on maintaining its compatibility. This closes #8975. --- .../src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java index 7f5707e..d1534dd 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java @@ -18,7 +18,6 @@ package org.apache.flink.table.catalog; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.StringUtils; import static org.apache.flink.util.Preconditions.checkArgument; @@ -26,7 +25,6 @@ import static org.apache.flink.util.Preconditions.checkArgument; /** * Abstract class for catalogs. */ -@PublicEvolving public abstract class AbstractCatalog implements Catalog { private final String catalogName; private final String defaultDatabase;
[flink] 02/04: [hotfix][runtime, tests] Remove ComponentMainThreadExecutor interface from ManuallyTriggeredScheduledExecutor
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9a0c965aaac9b4c786decdb4a76d7167520e9776 Author: Gary Yao AuthorDate: Thu Jun 27 17:05:28 2019 +0200 [hotfix][runtime, tests] Remove ComponentMainThreadExecutor interface from ManuallyTriggeredScheduledExecutor --- .../runtime/concurrent/ManuallyTriggeredScheduledExecutor.java | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java index dd307c4..870cda8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java @@ -39,7 +39,7 @@ import java.util.concurrent.TimeoutException; /** * Simple {@link ScheduledExecutor} implementation for testing purposes. */ -public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor, ComponentMainThreadExecutor { +public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor { private final Executor executorDelegate; private final ArrayDeque queuedRunnables = new ArrayDeque<>(); @@ -148,10 +148,6 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor, Co return scheduledTask; } - @Override - public void assertRunningInMainThread() { - } - private static final class ScheduledTask implements ScheduledFuture { private final Callable callable;
[flink] 01/04: [hotfix][runtime] Fix checkstyle violations in FailoverStrategyLoader
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c98a6b92ed00c9643e61f8663ef55a2649e8278b Author: Gary Yao AuthorDate: Mon Jul 1 10:21:39 2019 +0200 [hotfix][runtime] Fix checkstyle violations in FailoverStrategyLoader --- .../runtime/executiongraph/failover/FailoverStrategyLoader.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java index 8b6fa6e..eb003b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java @@ -28,14 +28,14 @@ import org.slf4j.Logger; import javax.annotation.Nullable; /** - * A utility class to load failover strategies from the configuration. + * A utility class to load failover strategies from the configuration. */ public class FailoverStrategyLoader { - /** Config name for the {@link RestartAllStrategy} */ + /** Config name for the {@link RestartAllStrategy}. */ public static final String FULL_RESTART_STRATEGY_NAME = "full"; - /** Config name for the {@link RestartIndividualStrategy} */ + /** Config name for the {@link RestartIndividualStrategy}. */ public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual"; /** Config name for the {@link RestartPipelinedRegionStrategy} */
[flink] 04/04: [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6a682ff7bfa6f0073759ec716b09bf6e06a3d36b Author: Gary Yao AuthorDate: Wed Jul 3 20:20:34 2019 +0200 [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling Implement adapter (AdaptedRestartPipelinedRegionStrategyNG) that adapts org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy to the legacy failover strategy interface (org.apache.flink.runtime.executiongraph.failover.FailoverStrategy). The new AdaptedRestartPipelinedRegionStrategyNG is chosen if config option jobmanager.execution.failover-strategy is set to "region". The legacy behavior can be enabled by setting the config option to "region-legacy". This closes #8922. --- .../runtime/executiongraph/ExecutionGraph.java | 156 +-- .../runtime/executiongraph/SchedulingUtils.java| 218 ++ .../AdaptedRestartPipelinedRegionStrategyNG.java | 316 ++ .../failover/FailoverStrategyLoader.java | 8 +- .../runtime/scheduler/ExecutionVertexVersion.java | 45 ++ .../scheduler/ExecutionVertexVersioner.java| 76 ...egionStrategyNGAbortPendingCheckpointsTest.java | 171 ...inedRegionStrategyNGConcurrentFailoverTest.java | 284 + ...startPipelinedRegionStrategyNGFailoverTest.java | 459 + .../executiongraph/ExecutionGraphRestartTest.java | 108 - .../PipelinedFailoverRegionBuildingTest.java | 2 +- ...RestartPipelinedRegionStrategyBuildingTest.java | 4 +- .../scheduler/ExecutionVertexVersionerTest.java| 121 ++ 13 files changed, 1807 insertions(+), 161 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index ce65b68..0840c00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; -import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; @@ -65,9 +64,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter; @@ -108,16 +104,13 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -952,11 +945,11 @@ public class ExecutionGraph implements AccessExecutionGraph { switch (scheduleMode) { case LAZY_FROM_SOURCES: - newSchedulingFuture = scheduleLazy(slotProvider); + newSchedulingFuture = scheduleLazy(); break; case EAGER: - newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); + newSchedulingFuture = scheduleEager();
[flink] branch master updated (6624562 -> 6a682ff)
This is an automated email from the ASF dual-hosted git repository. gary pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6624562 [FLINK-13043] [Library / CEP] Fix the bug of parsing Dewey number from string new c98a6b9 [hotfix][runtime] Fix checkstyle violations in FailoverStrategyLoader new 9a0c965 [hotfix][runtime, tests] Remove ComponentMainThreadExecutor interface from ManuallyTriggeredScheduledExecutor new ba12e9a [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor new 6a682ff [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../ComponentMainThreadExecutorServiceAdapter.java | 60 ++- .../runtime/executiongraph/ExecutionGraph.java | 156 +-- .../runtime/executiongraph/SchedulingUtils.java| 218 ++ .../AdaptedRestartPipelinedRegionStrategyNG.java | 316 ++ .../failover/FailoverStrategyLoader.java | 14 +- .../runtime/scheduler/ExecutionVertexVersion.java | 26 +- .../scheduler/ExecutionVertexVersioner.java| 76 .../ManuallyTriggeredScheduledExecutor.java| 6 +- ...egionStrategyNGAbortPendingCheckpointsTest.java | 171 ...inedRegionStrategyNGConcurrentFailoverTest.java | 284 + ...startPipelinedRegionStrategyNGFailoverTest.java | 459 + .../executiongraph/ExecutionGraphRestartTest.java | 108 - .../PipelinedFailoverRegionBuildingTest.java | 2 +- ...RestartPipelinedRegionStrategyBuildingTest.java | 4 +- .../scheduler/ExecutionVertexVersionerTest.java| 121 ++ 15 files changed, 1836 insertions(+), 185 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java copy flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/DummyTableSinkFactory.java => flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java (58%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java
[flink] 03/04: [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ba12e9af4f6af3bff1ae3298e9aaaf1edbdff744 Author: Gary Yao AuthorDate: Wed Jul 3 20:19:16 2019 +0200 [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor --- .../ComponentMainThreadExecutorServiceAdapter.java | 60 ++ 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java index dbd94ab..c71675a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java @@ -18,31 +18,73 @@ package org.apache.flink.runtime.concurrent; -import javax.annotation.Nonnull; +import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Adapter class for a {@link ScheduledExecutorService} which shall be used as a + * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a * {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the * main thread of the executor. */ -public class ComponentMainThreadExecutorServiceAdapter - extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor { +public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainThreadExecutor { + + private final ScheduledExecutor scheduledExecutor; /** A runnable that should assert that the current thread is the expected main thread. */ - @Nonnull private final Runnable mainThreadCheck; public ComponentMainThreadExecutorServiceAdapter( - @Nonnull ScheduledExecutorService scheduledExecutorService, - @Nonnull Runnable mainThreadCheck) { - super(scheduledExecutorService); - this.mainThreadCheck = mainThreadCheck; + final ScheduledExecutorService scheduledExecutorService, + final Runnable mainThreadCheck) { + this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck); + } + + public ComponentMainThreadExecutorServiceAdapter( + final ScheduledExecutor scheduledExecutorService, + final Thread mainThread) { + this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread)); + } + + private ComponentMainThreadExecutorServiceAdapter( + final ScheduledExecutor scheduledExecutor, + final Runnable mainThreadCheck) { + this.scheduledExecutor = checkNotNull(scheduledExecutor); + this.mainThreadCheck = checkNotNull(mainThreadCheck); } @Override public void assertRunningInMainThread() { mainThreadCheck.run(); } + + @Override + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + return scheduledExecutor.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) { + return scheduledExecutor.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + return scheduledExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void execute(final Runnable command) { + scheduledExecutor.execute(command); + } }
[flink] 07/07: [FLINK-13043] [Library / CEP] Fix the bug of parsing Dewey number from string
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6624562982c9d57bebba8cb4b574b8ed28640a0d Author: liyafan82 AuthorDate: Mon Jul 1 18:59:40 2019 +0800 [FLINK-13043] [Library / CEP] Fix the bug of parsing Dewey number from string This closes #8936 --- .../src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java | 6 -- .../src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java | 5 + 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index 6141bf2..de41ef1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -175,9 +175,9 @@ public class DeweyNumber implements Serializable { public static DeweyNumber fromString(final String deweyNumberString) { String[] splits = deweyNumberString.split("\\."); - if (splits.length == 0) { + if (splits.length == 1) { return new DeweyNumber(Integer.parseInt(deweyNumberString)); - } else { + } else if (splits.length > 0) { int[] deweyNumber = new int[splits.length]; for (int i = 0; i < splits.length; i++) { @@ -185,6 +185,8 @@ public class DeweyNumber implements Serializable { } return new DeweyNumber(deweyNumber); + } else { + throw new IllegalArgumentException("Failed to parse " + deweyNumberString + " as a Dewey number"); } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java index e28e77d..bf07294 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java @@ -55,4 +55,9 @@ public class DeweyNumberTest extends TestLogger { assertFalse(startAddStage.isCompatibleWith(increaseAddStage)); assertFalse(startAddStageIncreased.isCompatibleWith(startAddStageIncreasedAddStage)); } + + @Test(expected = IllegalArgumentException.class) + public void testZeroSplitsDeweyNumber() { + DeweyNumber.fromString("."); + } }
[flink] 05/07: [FLINK-13017] [docs] Do not mount local $HOME into docs docker environment
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7a8333b18503cc2414bb405dced91d4b83260a6d Author: Nico Kruber AuthorDate: Thu Jun 27 14:39:33 2019 +0200 [FLINK-13017] [docs] Do not mount local $HOME into docs docker environment This closes #8917 --- docs/docker/run.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/docker/run.sh b/docs/docker/run.sh index 5598c0a..bd02178 100755 --- a/docs/docker/run.sh +++ b/docs/docker/run.sh @@ -31,12 +31,10 @@ if [ "$(uname -s)" == "Linux" ]; then USER_NAME=${SUDO_USER:=$USER} USER_ID=$(id -u "${USER_NAME}") GROUP_ID=$(id -g "${USER_NAME}") - LOCAL_HOME="/home/${USER_NAME}" else # boot2docker uid and gid USER_NAME=$USER USER_ID=1000 GROUP_ID=50 - LOCAL_HOME="/Users/${USER_NAME}" fi docker build -t "${IMAGE_NAME}-${USER_NAME}" - <
[flink] 03/07: [FLINK-12840] [core] Fix network utils to work with ipv6 correctly
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3854552ceefd2b2b9c0e2a9b6152a7fcb69153fe Author: potseluev AuthorDate: Fri Jun 14 01:04:44 2019 +0300 [FLINK-12840] [core] Fix network utils to work with ipv6 correctly - Fixes problems around akka configuration parsing with some IPv6 literals - Fixes an issue with address parsing and validation with some Ipv6 literals This closes #8734 --- flink-core/src/main/java/org/apache/flink/util/NetUtils.java | 6 ++ .../src/test/java/org/apache/flink/util/NetUtilsTest.java | 6 ++ .../main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 6 +++--- .../scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala | 11 +++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java index 447e8a9..1a4bb7a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java @@ -134,6 +134,12 @@ public class NetUtils { host = InetAddress.getLoopbackAddress().getHostAddress(); } else { host = host.trim().toLowerCase(); + if (host.startsWith("[") && host.endsWith("]")) { + String address = host.substring(1, host.length() - 1); + if (IPAddressUtil.isIPv6LiteralAddress(address)) { + host = address; + } + } } // normalize and valid address diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java index 04faa67..ffd9e5e 100644 --- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java @@ -193,6 +193,12 @@ public class NetUtilsTest { Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port)); } { + // [IPv6] + String host = "[2001:0db8:85a3:::8a2e:0370:7334]"; + int port = 42; + Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port)); + } + { // Hostnames String host = "somerandomhostname"; int port = 99; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala old mode 100644 new mode 100755 index 14563e9..e9b11f7 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -547,7 +547,7 @@ object AkkaUtils { } else { // if bindAddress is null or empty, then leave bindAddress unspecified. Akka will pick // InetAddress.getLocalHost.getHostAddress -"\"\"" +"" } val hostnameConfigString = @@ -556,8 +556,8 @@ object AkkaUtils { | remote { |netty { | tcp { - |hostname = $effectiveHostname - |bind-hostname = $bindAddress + |hostname = "$effectiveHostname" + |bind-hostname = "$bindAddress" | } |} | } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index c051f7a..8c970a0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -209,4 +209,15 @@ class AkkaUtilsTest akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-max") .should(equal(maxThreads)) } + + test("getAkkaConfig should work with ipv6 addresses") { +val ipv6AddressString = "2001:db8:10:11:12:ff00:42:8329" +val configuration = new Configuration() +val port = 1234 + +val akkaConfig = AkkaUtils.getAkkaConfig(configuration, ipv6AddressString, port) + +akkaConfig.getString("akka.remote.netty.tcp.hostname") should + equal(NetUtils.unresolvedHostToNormalizedString(ipv6AddressString)) + } }
[flink] branch master updated (2b1a125 -> 6624562)
This is an automated email from the ASF dual-hosted git repository. sewen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2b1a125 [hotfix][table] remove @PublicEvolving annotation from AbstractCatalog as it's not supported to be public new 085a7ba [FLINK-12974] [build] Bump checkstyle to 8.14 new b5f4f32 [hotfix] [docs] Update IDE setup instructions for latest IntelliJ IDEA new 3854552 [FLINK-12840] [core] Fix network utils to work with ipv6 correctly new 5c9caa7 [hotfix] [examples] Distinguish 'netcat' arguments for Linux and Windows in the SocketWIndowWordcount JavaDocs new 7a8333b [FLINK-13017] [docs] Do not mount local $HOME into docs docker environment new a125e2a [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types new 6624562 [FLINK-13043] [Library / CEP] Fix the bug of parsing Dewey number from string The 7 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/docker/run.sh | 3 --- docs/flinkDev/ide_setup.md | 25 +- docs/flinkDev/ide_setup.zh.md | 23 + .../cassandra/AbstractCassandraTupleSink.java | 17 +++- .../connectors/cassandra/CassandraSink.java| 13 ++ .../cassandra/CassandraSinkBaseConfig.java | 27 +-- .../cassandra/CassandraConnectorITCase.java| 30 ++ .../main/java/org/apache/flink/util/NetUtils.java | 6 + .../java/org/apache/flink/util/NetUtilsTest.java | 6 + .../examples/socket/SocketWindowWordCount.java | 2 +- .../examples/socket/SocketWindowWordCount.scala| 2 +- .../java/org/apache/flink/cep/nfa/DeweyNumber.java | 6 +++-- .../org/apache/flink/cep/nfa/DeweyNumberTest.java | 5 .../org/apache/flink/runtime/akka/AkkaUtils.scala | 6 ++--- .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 11 pom.xml| 2 +- 16 files changed, 147 insertions(+), 37 deletions(-) mode change 100644 => 100755 flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
[flink] 01/07: [FLINK-12974] [build] Bump checkstyle to 8.14
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 085a7ba64bc1325717bd5f882d308eb6b3dd9ee5 Author: ifndef-SleePy AuthorDate: Tue Jun 25 17:06:59 2019 +0800 [FLINK-12974] [build] Bump checkstyle to 8.14 This closes #8870 --- docs/flinkDev/ide_setup.md | 2 +- pom.xml| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/flinkDev/ide_setup.md b/docs/flinkDev/ide_setup.md index 3dfc905..c6b0cd2 100644 --- a/docs/flinkDev/ide_setup.md +++ b/docs/flinkDev/ide_setup.md @@ -87,7 +87,7 @@ IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin. 1. Install the "Checkstyle-IDEA" plugin from the IntelliJ plugin repository. 2. Configure the plugin by going to Settings -> Other Settings -> Checkstyle. 3. Set the "Scan Scope" to "Only Java sources (including tests)". -4. Select _8.12_ in the "Checkstyle Version" dropdown and click apply. **This step is important, +4. Select _8.14_ in the "Checkstyle Version" dropdown and click apply. **This step is important, don't skip it!** 5. In the "Configuration File" pane, add a new configuration using the plus icon: 1. Set the "Description" to "Flink". diff --git a/pom.xml b/pom.xml index 2b3fa48..0357c7a 100644 --- a/pom.xml +++ b/pom.xml @@ -1608,7 +1608,7 @@ under the License. com.puppycrawl.tools checkstyle - 8.12 + 8.14
[flink] 06/07: [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit a125e2a11444847920911927955be2d119716838 Author: ozan AuthorDate: Wed Jun 12 21:35:54 2019 +0900 [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types This closes #8714 --- .../cassandra/AbstractCassandraTupleSink.java | 17 +++- .../connectors/cassandra/CassandraSink.java| 13 ++ .../cassandra/CassandraSinkBaseConfig.java | 27 +-- .../cassandra/CassandraConnectorITCase.java| 30 ++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java index 5e1fcca..3cf8aa4 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.cassandra; import org.apache.flink.configuration.Configuration; +import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.google.common.util.concurrent.ListenableFuture; @@ -31,6 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture; public abstract class AbstractCassandraTupleSink extends CassandraSinkBase { private final String insertQuery; private transient PreparedStatement ps; + private final boolean ignoreNullFields; public AbstractCassandraTupleSink( String insertQuery, @@ -39,6 +41,7 @@ public abstract class AbstractCassandraTupleSink extends CassandraSinkBase extends CassandraSinkBase send(IN value) { Object[] fields = extract(value); - return session.executeAsync(ps.bind(fields)); + return session.executeAsync(bind(fields)); + } + + private BoundStatement bind(Object[] fields) { + BoundStatement bs = ps.bind(fields); + if (ignoreNullFields) { + for (int i = 0; i < fields.length; i++) { + if (fields[i] == null) { + bs.unset(i); + } + } + } + return bs; } protected abstract Object[] extract(IN record); diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 128e5e0..2a8ebff 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -399,6 +399,19 @@ public class CassandraSink { } /** +* Enables ignoring null values, treats null values as unset and avoids writing null fields +* and creating tombstones. +* +* This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called. +* +* @return this builder +*/ + public CassandraSinkBuilder enableIgnoreNullFields() { + this.configBuilder.setIgnoreNullFields(true); + return this; + } + + /** * Finalizes the configuration of this sink. * * @return finalized sink diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java index cb8d904..d48f973 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java @@ -38,6 +38,12 @@ public final class CassandraSinkBaseConfig implements Serializable { */ public static final Duration
[flink] 02/07: [hotfix] [docs] Update IDE setup instructions for latest IntelliJ IDEA
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b5f4f327f6537f1ec796d35c7ee1f2a65dab3565 Author: Hugo Da Cruz Louro AuthorDate: Wed Jun 26 23:46:31 2019 -0700 [hotfix] [docs] Update IDE setup instructions for latest IntelliJ IDEA This closes #8908 --- docs/flinkDev/ide_setup.md| 23 --- docs/flinkDev/ide_setup.zh.md | 23 --- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/docs/flinkDev/ide_setup.md b/docs/flinkDev/ide_setup.md index c6b0cd2..c27e479 100644 --- a/docs/flinkDev/ide_setup.md +++ b/docs/flinkDev/ide_setup.md @@ -49,7 +49,7 @@ git clone https://github.com/apache/flink.git A brief guide on how to set up IntelliJ IDEA IDE for development of the Flink core. As Eclipse is known to have issues with mixed Scala and Java projects, more and more contributors are migrating to IntelliJ IDEA. -The following documentation describes the steps to setup IntelliJ IDEA 2016.2.5 +The following documentation describes the steps to setup IntelliJ IDEA 2019.1.3 ([https://www.jetbrains.com/idea/download/](https://www.jetbrains.com/idea/download/)) with the Flink sources. @@ -66,19 +66,20 @@ to enable support for Scala projects and files: ### Importing Flink -1. Start IntelliJ IDEA and choose "Import Project" -2. Select the root folder of the Flink repository +1. Start IntelliJ IDEA and choose "New -> Project from Existing Sources" +2. Select the root folder of the cloned Flink repository 3. Choose "Import project from external model" and select "Maven" -4. Leave the default options and click on "Next" until you hit the SDK section. -5. If there is no SDK, create a one with the "+" sign top left, - then click "JDK", select your JDK home directory and click "OK". - Otherwise simply select your SDK. -6. Continue by clicking "Next" again and finish the import. +4. Leave the default options and successively click "Next" until you reach the SDK section. +5. If there is no SDK listed, create one using the "+" sign on the top left. + Select "JDK", choose the JDK home directory and click "OK". + Select the most suiting JDK version. NOTE: A good rule of thumb is to select + the JDK version matching the active Maven profile. +6. Continue by clicking "Next" until finishing the import. 7. Right-click on the imported Flink project -> Maven -> Generate Sources and Update Folders. Note that this will install Flink libraries in your local Maven repository, - i.e. "/home/*-your-user-*/.m2/repository/org/apache/flink/". - Alternatively, `mvn clean package -DskipTests` also creates the necessary - files for the IDE to work with but without installing libraries. + located by default at "/home/$USER/.m2/repository/org/apache/flink/". + Alternatively, `mvn clean package -DskipTests` also creates the files necessary + for the IDE to work but without installing the libraries. 8. Build the Project (Build -> Make Project) ### Checkstyle For Java diff --git a/docs/flinkDev/ide_setup.zh.md b/docs/flinkDev/ide_setup.zh.md index a893d66..1177c6a 100644 --- a/docs/flinkDev/ide_setup.zh.md +++ b/docs/flinkDev/ide_setup.zh.md @@ -49,7 +49,7 @@ git clone https://github.com/apache/flink.git A brief guide on how to set up IntelliJ IDEA IDE for development of the Flink core. As Eclipse is known to have issues with mixed Scala and Java projects, more and more contributors are migrating to IntelliJ IDEA. -The following documentation describes the steps to setup IntelliJ IDEA 2016.2.5 +The following documentation describes the steps to setup IntelliJ IDEA 2019.1.3 ([https://www.jetbrains.com/idea/download/](https://www.jetbrains.com/idea/download/)) with the Flink sources. @@ -66,19 +66,20 @@ to enable support for Scala projects and files: ### Importing Flink -1. Start IntelliJ IDEA and choose "Import Project" -2. Select the root folder of the Flink repository +1. Start IntelliJ IDEA and choose "New -> Project from Existing Sources" +2. Select the root folder of the cloned Flink repository 3. Choose "Import project from external model" and select "Maven" -4. Leave the default options and click on "Next" until you hit the SDK section. -5. If there is no SDK, create a one with the "+" sign top left, - then click "JDK", select your JDK home directory and click "OK". - Otherwise simply select your SDK. -6. Continue by clicking "Next" again and finish the import. +4. Leave the default options and successively click "Next" until you reach the SDK section. +5. If there is no SDK listed, create one using the "+" sign on the top left. + Select "JDK", choose the JDK home directory and click "OK". + Select the most suiting JDK version. NOTE: A good rule of thumb is to select + the JDK version matching the active Maven profile. +6. Continue by clicking "Next" until finishing the
[flink] 04/07: [hotfix] [examples] Distinguish 'netcat' arguments for Linux and Windows in the SocketWIndowWordcount JavaDocs
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5c9caa7d2cc4d917898879ad9518c07e5cca5b49 Author: Jason AuthorDate: Mon Jun 3 14:25:30 2019 +0800 [hotfix] [examples] Distinguish 'netcat' arguments for Linux and Windows in the SocketWIndowWordcount JavaDocs This closes #8593. --- .../apache/flink/streaming/examples/socket/SocketWindowWordCount.java | 2 +- .../flink/streaming/scala/examples/socket/SocketWindowWordCount.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java index 646a74a..921ab3a 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java @@ -33,7 +33,7 @@ import org.apache.flink.util.Collector; * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via * - * nc -l 12345 + * nc -l 12345 on Linux or nc -l -p 12345 on Windows * * and run this example with the hostname and the port as arguments. */ diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala index e607f61..bdb1561 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.windowing.time.Time * The easiest way to try this out is to open a text sever (at port 12345) * using the ''netcat'' tool via * {{{ - * nc -l 12345 + * nc -l 12345 on Linux or nc -l -p 12345 on Windows * }}} * and run this example with the hostname and the port as arguments.. */
[flink-web] 02/02: Rebuild website
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit a3942debf36970e62ac5374e25ca6ead4fca1028 Author: Jark Wu AuthorDate: Thu Jul 4 10:48:26 2019 +0800 Rebuild website --- .gitignore| 1 + content/zh/index.html | 5 + content/zh/poweredby.html | 4 3 files changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index 1702d90..3a73be0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ /.rubydeps .idea *.iml +.jekyll-metadata \ No newline at end of file diff --git a/content/zh/index.html b/content/zh/index.html index a49e123..bbf0b60 100644 --- a/content/zh/index.html +++ b/content/zh/index.html @@ -368,6 +368,11 @@ + + + + + diff --git a/content/zh/poweredby.html b/content/zh/poweredby.html index f73c073..275fa5d 100644 --- a/content/zh/poweredby.html +++ b/content/zh/poweredby.html @@ -217,6 +217,10 @@ King,Candy Crush Saga的创建者,使用 Flink 为数据科学团队提供实时分析仪表板。https://techblog.king.com/rbea-scalable-real-time-analytics-king/; target="_blank"> 阅读 King 的 Flink 实现 + + Klaviyo使用 Apache Flink 扩展其实时分析系统,该系统每秒对超过一百万个事件进行重复数据删除和聚合。https://klaviyo.tech/tagged/counting; target="_blank"> 阅读 Klaviyo 的实时分析 + + Lyft 使用 Flink 作为其流媒体平台的处理引擎,例如为机器学习持续生成特征。https://www.slideshare.net/SeattleApacheFlinkMeetup/streaminglyft-greg-fee-seattle-apache-flink-meetup-104398613; target="_blank"> 阅读更多关于 Lyft 的流媒体
[flink-web] 01/02: [FLINK-12833][docs-zh] Add Klaviyo to Chinese PoweredBy page
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit a89ff4efa0e0b3e9be23376e206c75a253b3ae40 Author: guanghui01.rong AuthorDate: Wed Jul 3 21:45:42 2019 +0800 [FLINK-12833][docs-zh] Add Klaviyo to Chinese PoweredBy page This closes #223 --- index.zh.md | 5 + poweredby.zh.md | 4 2 files changed, 9 insertions(+) diff --git a/index.zh.md b/index.zh.md index 789f8e6..52e8692 100644 --- a/index.zh.md +++ b/index.zh.md @@ -219,6 +219,11 @@ layout: base + + + + + diff --git a/poweredby.zh.md b/poweredby.zh.md index 636ce25..58fb24b 100644 --- a/poweredby.zh.md +++ b/poweredby.zh.md @@ -64,6 +64,10 @@ Apache Flink 为全球许多公司和企业的关键业务提供支持。在这 King,Candy Crush Saga的创建者,使用 Flink 为数据科学团队提供实时分析仪表板。https://techblog.king.com/rbea-scalable-real-time-analytics-king/; target='_blank'> 阅读 King 的 Flink 实现 + + Klaviyo使用 Apache Flink 扩展其实时分析系统,该系统每秒对超过一百万个事件进行重复数据删除和聚合。https://klaviyo.tech/tagged/counting; target='_blank'> 阅读 Klaviyo 的实时分析 + + Lyft 使用 Flink 作为其流媒体平台的处理引擎,例如为机器学习持续生成特征。https://www.slideshare.net/SeattleApacheFlinkMeetup/streaminglyft-greg-fee-seattle-apache-flink-meetup-104398613; target='_blank'> 阅读更多关于 Lyft 的流媒体
[flink-web] branch asf-site updated (703ffbe -> a3942de)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 703ffbe Rebuild website new a89ff4e [FLINK-12833][docs-zh] Add Klaviyo to Chinese PoweredBy page new a3942de Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .gitignore| 1 + content/zh/index.html | 5 + content/zh/poweredby.html | 4 index.zh.md | 5 + poweredby.zh.md | 4 5 files changed, 19 insertions(+)
[flink] branch master updated (6a682ff -> 0e4d4b4)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6a682ff [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling add 900f74d [hotfix][table-planner-blink] Fix boolean hashCode codegen add f7b164f [hotfix][table-planner-blink] Set shuffleMode to PartitionTransformation in BatchExecExchange add 960ae97 [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink add 0e4d4b4 [FLINK-12959][table-planner-blink] Implement BatchExecHashJoin translateToPlanInternal No new revisions were added by this update. Summary of changes: .../apache/flink/table/codegen/CodeGenUtils.scala | 1 + .../table/codegen/LongHashJoinGenerator.scala | 17 ++- .../table/codegen/OperatorCodeGenerator.scala | 48 ++--- .../nodes/physical/batch/BatchExecExchange.scala | 34 +++--- .../nodes/physical/batch/BatchExecHashJoin.scala | 119 +++-- .../plan/nodes/physical/batch/BatchExecSort.scala | 4 - .../physical/batch/BatchExecSortMergeJoin.scala| 27 - .../flink/table/runtime/batch/sql/MiscITCase.scala | 6 +- .../runtime/batch/sql/agg/GroupingSetsITCase.scala | 3 +- .../table/runtime/batch/sql/join/JoinITCase.scala | 87 +-- .../flink/table/runtime/utils/BatchTestBase.scala | 1 + .../apache/flink/table/api/TableConfigOptions.java | 5 + .../flink/table/runtime/join/HashJoinOperator.java | 37 --- .../flink/table/runtime/join/HashJoinType.java | 17 +++ .../table/runtime/join/SortMergeJoinOperator.java | 17 +-- .../runtime/over/BufferDataOverWindowOperator.java | 5 +- .../table/runtime/sort/SortLimitOperator.java | 10 +- .../flink/table/runtime/sort/SortOperator.java | 11 +- .../table/runtime/sort/StreamSortOperator.java | 6 +- .../runtime/join/Int2HashJoinOperatorTest.java | 77 +++-- .../join/Int2SortMergeJoinOperatorTest.java| 6 +- .../runtime/join/String2HashJoinOperatorTest.java | 59 -- .../table/runtime/sort/StreamSortOperatorTest.java | 2 + 23 files changed, 366 insertions(+), 233 deletions(-)
[flink] branch master updated: [FLINK-11638][docs-zh] Translate "Savepoints" page into Chines
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4ad19ef [FLINK-11638][docs-zh] Translate "Savepoints" page into Chines 4ad19ef is described below commit 4ad19eff713284c0f30c7dba3cd21095baf18d42 Author: XuQianJin-Stars AuthorDate: Sun Apr 28 17:40:35 2019 +0800 [FLINK-11638][docs-zh] Translate "Savepoints" page into Chines This closes #8300 --- docs/ops/state/savepoints.zh.md | 158 +++- 1 file changed, 75 insertions(+), 83 deletions(-) diff --git a/docs/ops/state/savepoints.zh.md b/docs/ops/state/savepoints.zh.md index c235344..e601e7f 100644 --- a/docs/ops/state/savepoints.zh.md +++ b/docs/ops/state/savepoints.zh.md @@ -25,32 +25,22 @@ under the License. * toc {:toc} -## What is a Savepoint? How is a Savepoint different from a Checkpoint? +## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同? -A Savepoint is a consistent image of the execution state of a streaming job, created via Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html). You can use Savepoints to stop-and-resume, fork, -or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a (relatively small) meta data file. The files on stable storage represent the net data of the job's execution state -image. The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths. +Savepoint 是依据 Flink [checkpointing 机制]({{ site.baseurl }}/zh/internals/stream_checkpointing.html)所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,...) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。 -Attention: In order to allow upgrades between programs and Flink versions, it is important to check out the following section about assigning IDs to your operators. +注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。 +从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 Checkpoint 的主要目的是为意外失败的作业提供恢复机制。 Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互。 作为一种恢复和定期触发的方法,Checkpoint 实现有两个设计目标:i)轻量级创建和 ii)尽可能快地恢复。 可能会利用某些特定的属性来达到这个,例如, 工作代码在执行尝试之间不会改变。 在用户终止作业后,通常会删除 Checkpoint(除非明确配置为保留的 Checkpoint)。 -Conceptually, Flink's Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is to provide a recovery mechanism in case of -unexpected job failures. A Checkpoint's lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. As a method of recovery and being periodically triggered, two main -design goals for the Checkpoint implementation are i) being as lightweight to create and ii) being as fast to restore from as possible. Optimizations towards those goals can exploit certain properties, e.g. that the job code -doesn't change between the execution attempts. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints). + 与此相反、Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。 例如,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。 当然,Savepoint 必须在作业停止后继续存在。 从概念上讲,Savepoint 的生成,恢复成本可能更高一些,Savepoint 更多地关注可移植性和对前面提到的作业更改的支持。 -In contrast to all this, Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, -changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus -more on portability and support for the previously mentioned changes to the job. +除去这些概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式。然而,目前有一个例外,我们可能会在未来引入更多的差异。例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。 -Those conceptual differences aside, the current implementations of Checkpoints and Savepoints are basically using the same code and produce the same format. However, there is currently one exception from this, and we might -introduce more differences in the future. The exception are incremental checkpoints with the RocksDB state backend. They are using some RocksDB internal format instead of Flink’s native savepoint format. This makes them the -first instance of a more lightweight checkpointing mechanism, compared to Savepoints. +## 分配算子 ID -## Assigning