[GitHub] [flink] lirui-apache commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog
lirui-apache commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog URL: https://github.com/apache/flink/pull/8616#issuecomment-498946674 LGTM, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290584736 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Option[Boolean]): RelDataType = { Review comment: The original intention here is to convert all time indicators to regular `TimestampType` when it is batch mode. You mean this conversion should be outside? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290584058 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Option[Boolean]): RelDataType = { buildRelDataType( tableSchema.getFieldNames.toSeq, - tableSchema.getFieldTypes map { -case _: TimeIndicatorTypeInfo if isStreaming.isDefined && !isStreaming.get => - InternalTypes.TIMESTAMP -case tpe: TypeInformation[_] => createInternalTypeFromTypeInfo(tpe) + tableSchema.getFieldDataTypes.map(_.getLogicalType).map { +case t: TimestampType if isStreaming.isDefined && !isStreaming.get => + if (t.getKind == TimestampKind.REGULAR) t else new TimestampType(3) +case t => t }) } def buildRelDataType( Review comment: I'll change name to `buildLogicalRowType` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290582852 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -406,78 +416,76 @@ object FlinkTypeFactory { case _ => false } - def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { -case BOOLEAN => InternalTypes.BOOLEAN -case TINYINT => InternalTypes.BYTE -case SMALLINT => InternalTypes.SHORT -case INTEGER => InternalTypes.INT -case BIGINT => InternalTypes.LONG -case FLOAT => InternalTypes.FLOAT -case DOUBLE => InternalTypes.DOUBLE -case VARCHAR | CHAR => InternalTypes.STRING -case VARBINARY | BINARY => InternalTypes.BINARY -case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, relDataType.getScale) - -// time indicators -case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => - val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] - if (indicator.isEventTime) { -InternalTypes.ROWTIME_INDICATOR - } else { -InternalTypes.PROCTIME_INDICATOR - } - -// temporal types -case DATE => InternalTypes.DATE -case TIME => InternalTypes.TIME -case TIMESTAMP => InternalTypes.TIMESTAMP -case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MONTHS -case typeName if DAY_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MILLIS - -case NULL => - throw new TableException( -"Type NULL is not supported. Null values must have a supported type.") - -// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING -// are represented as Enum -case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]]) - -// extract encapsulated Type -case ANY if relDataType.isInstanceOf[GenericRelDataType] => - val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] - genericRelDataType.genericType - -case ROW if relDataType.isInstanceOf[RowRelDataType] => - val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType] - compositeRelDataType.rowType - -case ROW if relDataType.isInstanceOf[RelRecordType] => - val relRecordType = relDataType.asInstanceOf[RelRecordType] - new RowSchema(relRecordType).internalType - -case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] => - val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType] - multisetRelDataType.multisetType - -case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => - val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] - arrayRelDataType.arrayType - -case MAP if relDataType.isInstanceOf[MapRelDataType] => - val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] - mapRelDataType.mapType + def toLogicalType(relDataType: RelDataType): LogicalType = { +val logicalType = relDataType.getSqlTypeName match { + case BOOLEAN => new BooleanType() + case TINYINT => new TinyIntType() + case SMALLINT => new SmallIntType() + case INTEGER => new IntType() + case BIGINT => new BigIntType() + case FLOAT => new FloatType() + case DOUBLE => new DoubleType() + case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH) + case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH) + case DECIMAL => new DecimalType(relDataType.getPrecision, relDataType.getScale) + + // time indicators + case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => +val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] +if (indicator.isEventTime) { + new TimestampType(true, TimestampKind.ROWTIME, 3) +} else { + new TimestampType(true, TimestampKind.PROCTIME, 3) +} -case _@t => - throw new TableException(s"Type is not supported: $t") + // temporal types + case DATE => new DateType() + case TIME => new TimeType() + case TIMESTAMP => new TimestampType(3) + case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => +DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType + case typeName if DAY_INTERVAL_TYPES.contains(typeName) => +DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType + + case NULL => +throw new TableException( + "Type NULL is not supported. Null values must have a supported type.") + + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING + // are represented as Enum + case SYMBOL => toPlannerLogicalType(PlannerTypeConversions.toDataType(classOf[Enum[_]])) + + // extract encapsulated Type + case ANY if
[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290579108 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java ## @@ -47,7 +44,7 @@ private UnresolvedReferenceExpression sum = new UnresolvedReferenceExpression("sum"); private UnresolvedReferenceExpression count = new UnresolvedReferenceExpression("count"); - public abstract TypeInformation getSumType(); + public abstract DataType getSumType(); Review comment: Look at @twalthr 's response. >The user should not be confronted to two different type stacks. That's why the description in DataType calls it "hints", a user should always use DataType. If he adds a hint where it is not appropriate, we just don't need to use those hints. In @wuchong 's first thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.
KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module. URL: https://github.com/apache/flink/pull/8294#discussion_r290578406 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ## @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.graph.{StreamGraph, StreamGraphGenerator} import org.apache.flink.streaming.api.transformations.StreamTransformation +import org.apache.flink.table.api.PlannerConfigImpl Review comment: If TableEnv is unified, we actually can't see `PlannerConfigImpl` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290577508 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -41,87 +40,112 @@ import scala.collection.JavaConverters._ import scala.collection.mutable /** - * Flink specific type factory that represents the interface between Flink's [[InternalType]] + * Flink specific type factory that represents the interface between Flink's [[LogicalType]] * and Calcite's [[RelDataType]]. */ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { // NOTE: for future data types it might be necessary to // override more methods of RelDataTypeFactoryImpl - private val seenTypes = mutable.HashMap[(InternalType, Boolean), RelDataType]() + private val seenTypes = mutable.HashMap[LogicalType, RelDataType]() - def createTypeFromInternalType( - tp: InternalType, - isNullable: Boolean): RelDataType = { + def createTypeFromLogicalType(t: LogicalType, isNullable: Boolean): RelDataType = { Review comment: OK, I'll replace it with `createTypeFromLogicalType(t.copy(nullable))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290576941 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java ## @@ -150,7 +150,7 @@ public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); - final ShuffleEnvironment networkEnv = new NetworkEnvironmentBuilder().build(); + final ShuffleEnvironment networkEnv = new NettyShuffleEnvironmentBuilder().build(); Review comment: networkEnv -> shuffleEnvironment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290576205 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290565250 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -41,87 +40,112 @@ import scala.collection.JavaConverters._ import scala.collection.mutable /** - * Flink specific type factory that represents the interface between Flink's [[InternalType]] + * Flink specific type factory that represents the interface between Flink's [[LogicalType]] * and Calcite's [[RelDataType]]. */ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { // NOTE: for future data types it might be necessary to // override more methods of RelDataTypeFactoryImpl - private val seenTypes = mutable.HashMap[(InternalType, Boolean), RelDataType]() + private val seenTypes = mutable.HashMap[LogicalType, RelDataType]() - def createTypeFromInternalType( - tp: InternalType, - isNullable: Boolean): RelDataType = { + def createTypeFromLogicalType(t: LogicalType, isNullable: Boolean): RelDataType = { Review comment: Since `LogicalType` already contains nullability information, i suggest we drop this interface, to force all the caller pass nullable through `LogicalType`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290575834 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.ShortSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryArray; +import org.apache.flink.table.dataformat.BinaryGeneric; +import org.apache.flink.table.dataformat.BinaryMap; +import org.apache.flink.table.dataformat.BinaryString; +import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TypeInformationAnyType; +import org.apache.flink.table.typeutils.BaseRowSerializer; +import org.apache.flink.table.typeutils.BinaryArraySerializer; +import org.apache.flink.table.typeutils.BinaryGenericSerializer; +import org.apache.flink.table.typeutils.BinaryMapSerializer; +import org.apache.flink.table.typeutils.BinaryStringSerializer; +import org.apache.flink.table.typeutils.DecimalSerializer; +import org.apache.flink.types.Row; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT; + +/** + * Utilities for {@link LogicalType} and {@link DataType}.. + */ +public class PlannerTypeUtils { Review comment: Also a mix of multiple convert methods with unclean purpose. We can at least strip off conversion between `LogicalType` and `Class` to another dedicate class and explain when this kind of conversion will happen. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290575554 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeConversions.java ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.MultisetTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TypeInformationAnyType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.table.typeutils.BigDecimalTypeInfo; +import org.apache.flink.table.typeutils.BinaryStringTypeInfo; +import org.apache.flink.table.typeutils.DecimalTypeInfo; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Converters for {@link LogicalType} and {@link DataType}. + */ +public class PlannerTypeConversions { Review comment: this class mixed conversion between `LogicalType`, `DataType` and `TypeInformation`. It's better to split them into dedicated classes, and for each type of conversion, explain what scenarios would this conversion will happen. For example, when we need to convert `LogicalType` to `TypeInformation` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290569286 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java ## @@ -47,7 +44,7 @@ private UnresolvedReferenceExpression sum = new UnresolvedReferenceExpression("sum"); private UnresolvedReferenceExpression count = new UnresolvedReferenceExpression("count"); - public abstract TypeInformation getSumType(); + public abstract DataType getSumType(); Review comment: Why don't use `LogicalType` here? Do we want to support `DeclarativeAggregateFunction` to choose physical storage for acc buffer? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290567350 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala ## @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.schema -import org.apache.flink.table.`type`.ArrayType +import org.apache.flink.table.types.logical.ArrayType Review comment: Do we still need to keep our custom `ArrayRelDataType` ? How about directly use Calcite's `ArraySqlType`? Same question to `MapRelDataType`, `MultisetRelDataType` and `RowRelDataType` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568068 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -164,19 +188,19 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * using FlinkTypeFactory * * @param fieldNames field names -* @param fieldTypes field types, every element is Flink's [[InternalType]] +* @param fieldTypes field types, every element is Flink's [[LogicalType]] * @param fieldNullables field nullable properties * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ def buildLogicalRowType( fieldNames: Seq[String], - fieldTypes: Seq[InternalType], + fieldTypes: Seq[LogicalType], fieldNullables: Seq[Boolean]): RelDataType = { Review comment: Duplicate nullable information, `LogicalType` already contains such information. And i think `buildLogicalRowType` is also necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568801 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -406,78 +416,76 @@ object FlinkTypeFactory { case _ => false } - def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { -case BOOLEAN => InternalTypes.BOOLEAN -case TINYINT => InternalTypes.BYTE -case SMALLINT => InternalTypes.SHORT -case INTEGER => InternalTypes.INT -case BIGINT => InternalTypes.LONG -case FLOAT => InternalTypes.FLOAT -case DOUBLE => InternalTypes.DOUBLE -case VARCHAR | CHAR => InternalTypes.STRING -case VARBINARY | BINARY => InternalTypes.BINARY -case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, relDataType.getScale) - -// time indicators -case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => - val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] - if (indicator.isEventTime) { -InternalTypes.ROWTIME_INDICATOR - } else { -InternalTypes.PROCTIME_INDICATOR - } - -// temporal types -case DATE => InternalTypes.DATE -case TIME => InternalTypes.TIME -case TIMESTAMP => InternalTypes.TIMESTAMP -case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MONTHS -case typeName if DAY_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MILLIS - -case NULL => - throw new TableException( -"Type NULL is not supported. Null values must have a supported type.") - -// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING -// are represented as Enum -case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]]) - -// extract encapsulated Type -case ANY if relDataType.isInstanceOf[GenericRelDataType] => - val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] - genericRelDataType.genericType - -case ROW if relDataType.isInstanceOf[RowRelDataType] => - val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType] - compositeRelDataType.rowType - -case ROW if relDataType.isInstanceOf[RelRecordType] => - val relRecordType = relDataType.asInstanceOf[RelRecordType] - new RowSchema(relRecordType).internalType - -case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] => - val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType] - multisetRelDataType.multisetType - -case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => - val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] - arrayRelDataType.arrayType - -case MAP if relDataType.isInstanceOf[MapRelDataType] => - val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] - mapRelDataType.mapType + def toLogicalType(relDataType: RelDataType): LogicalType = { +val logicalType = relDataType.getSqlTypeName match { + case BOOLEAN => new BooleanType() + case TINYINT => new TinyIntType() + case SMALLINT => new SmallIntType() + case INTEGER => new IntType() + case BIGINT => new BigIntType() + case FLOAT => new FloatType() + case DOUBLE => new DoubleType() + case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH) + case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH) Review comment: why precision dropped and use max length? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568259 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Option[Boolean]): RelDataType = { buildRelDataType( tableSchema.getFieldNames.toSeq, - tableSchema.getFieldTypes map { -case _: TimeIndicatorTypeInfo if isStreaming.isDefined && !isStreaming.get => - InternalTypes.TIMESTAMP -case tpe: TypeInformation[_] => createInternalTypeFromTypeInfo(tpe) + tableSchema.getFieldDataTypes.map(_.getLogicalType).map { +case t: TimestampType if isStreaming.isDefined && !isStreaming.get => + if (t.getKind == TimestampKind.REGULAR) t else new TimestampType(3) +case t => t }) } def buildRelDataType( Review comment: Unclear purpose about this interface This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568669 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -406,78 +416,76 @@ object FlinkTypeFactory { case _ => false } - def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { -case BOOLEAN => InternalTypes.BOOLEAN -case TINYINT => InternalTypes.BYTE -case SMALLINT => InternalTypes.SHORT -case INTEGER => InternalTypes.INT -case BIGINT => InternalTypes.LONG -case FLOAT => InternalTypes.FLOAT -case DOUBLE => InternalTypes.DOUBLE -case VARCHAR | CHAR => InternalTypes.STRING -case VARBINARY | BINARY => InternalTypes.BINARY -case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, relDataType.getScale) - -// time indicators -case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => - val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] - if (indicator.isEventTime) { -InternalTypes.ROWTIME_INDICATOR - } else { -InternalTypes.PROCTIME_INDICATOR - } - -// temporal types -case DATE => InternalTypes.DATE -case TIME => InternalTypes.TIME -case TIMESTAMP => InternalTypes.TIMESTAMP -case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MONTHS -case typeName if DAY_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MILLIS - -case NULL => - throw new TableException( -"Type NULL is not supported. Null values must have a supported type.") - -// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING -// are represented as Enum -case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]]) - -// extract encapsulated Type -case ANY if relDataType.isInstanceOf[GenericRelDataType] => - val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] - genericRelDataType.genericType - -case ROW if relDataType.isInstanceOf[RowRelDataType] => - val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType] - compositeRelDataType.rowType - -case ROW if relDataType.isInstanceOf[RelRecordType] => - val relRecordType = relDataType.asInstanceOf[RelRecordType] - new RowSchema(relRecordType).internalType - -case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] => - val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType] - multisetRelDataType.multisetType - -case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => - val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] - arrayRelDataType.arrayType - -case MAP if relDataType.isInstanceOf[MapRelDataType] => - val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] - mapRelDataType.mapType + def toLogicalType(relDataType: RelDataType): LogicalType = { +val logicalType = relDataType.getSqlTypeName match { + case BOOLEAN => new BooleanType() + case TINYINT => new TinyIntType() + case SMALLINT => new SmallIntType() + case INTEGER => new IntType() + case BIGINT => new BigIntType() + case FLOAT => new FloatType() + case DOUBLE => new DoubleType() + case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH) Review comment: I remember you want disable all CHAR support This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568945 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -406,78 +416,76 @@ object FlinkTypeFactory { case _ => false } - def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { -case BOOLEAN => InternalTypes.BOOLEAN -case TINYINT => InternalTypes.BYTE -case SMALLINT => InternalTypes.SHORT -case INTEGER => InternalTypes.INT -case BIGINT => InternalTypes.LONG -case FLOAT => InternalTypes.FLOAT -case DOUBLE => InternalTypes.DOUBLE -case VARCHAR | CHAR => InternalTypes.STRING -case VARBINARY | BINARY => InternalTypes.BINARY -case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, relDataType.getScale) - -// time indicators -case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => - val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] - if (indicator.isEventTime) { -InternalTypes.ROWTIME_INDICATOR - } else { -InternalTypes.PROCTIME_INDICATOR - } - -// temporal types -case DATE => InternalTypes.DATE -case TIME => InternalTypes.TIME -case TIMESTAMP => InternalTypes.TIMESTAMP -case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MONTHS -case typeName if DAY_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MILLIS - -case NULL => - throw new TableException( -"Type NULL is not supported. Null values must have a supported type.") - -// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING -// are represented as Enum -case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]]) - -// extract encapsulated Type -case ANY if relDataType.isInstanceOf[GenericRelDataType] => - val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] - genericRelDataType.genericType - -case ROW if relDataType.isInstanceOf[RowRelDataType] => - val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType] - compositeRelDataType.rowType - -case ROW if relDataType.isInstanceOf[RelRecordType] => - val relRecordType = relDataType.asInstanceOf[RelRecordType] - new RowSchema(relRecordType).internalType - -case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] => - val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType] - multisetRelDataType.multisetType - -case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => - val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] - arrayRelDataType.arrayType - -case MAP if relDataType.isInstanceOf[MapRelDataType] => - val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] - mapRelDataType.mapType + def toLogicalType(relDataType: RelDataType): LogicalType = { +val logicalType = relDataType.getSqlTypeName match { + case BOOLEAN => new BooleanType() + case TINYINT => new TinyIntType() + case SMALLINT => new SmallIntType() + case INTEGER => new IntType() + case BIGINT => new BigIntType() + case FLOAT => new FloatType() + case DOUBLE => new DoubleType() + case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH) + case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH) + case DECIMAL => new DecimalType(relDataType.getPrecision, relDataType.getScale) + + // time indicators + case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => +val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] +if (indicator.isEventTime) { + new TimestampType(true, TimestampKind.ROWTIME, 3) +} else { + new TimestampType(true, TimestampKind.PROCTIME, 3) +} -case _@t => - throw new TableException(s"Type is not supported: $t") + // temporal types + case DATE => new DateType() + case TIME => new TimeType() + case TIMESTAMP => new TimestampType(3) + case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => +DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType + case typeName if DAY_INTERVAL_TYPES.contains(typeName) => +DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType + + case NULL => +throw new TableException( + "Type NULL is not supported. Null values must have a supported type.") + + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING + // are represented as Enum + case SYMBOL => toPlannerLogicalType(PlannerTypeConversions.toDataType(classOf[Enum[_]])) + + // extract encapsulated Type + case ANY if
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290575983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290567916 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -147,12 +171,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory * * @param fieldNames field names -* @param fieldTypes field types, every element is Flink's [[InternalType]] +* @param fieldTypes field types, every element is Flink's [[LogicalType]] * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ def buildLogicalRowType( Review comment: Why not just using `createTypeFromLogicalType` and pass in a `RowType` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568761 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -406,78 +416,76 @@ object FlinkTypeFactory { case _ => false } - def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { -case BOOLEAN => InternalTypes.BOOLEAN -case TINYINT => InternalTypes.BYTE -case SMALLINT => InternalTypes.SHORT -case INTEGER => InternalTypes.INT -case BIGINT => InternalTypes.LONG -case FLOAT => InternalTypes.FLOAT -case DOUBLE => InternalTypes.DOUBLE -case VARCHAR | CHAR => InternalTypes.STRING -case VARBINARY | BINARY => InternalTypes.BINARY -case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, relDataType.getScale) - -// time indicators -case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => - val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] - if (indicator.isEventTime) { -InternalTypes.ROWTIME_INDICATOR - } else { -InternalTypes.PROCTIME_INDICATOR - } - -// temporal types -case DATE => InternalTypes.DATE -case TIME => InternalTypes.TIME -case TIMESTAMP => InternalTypes.TIMESTAMP -case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MONTHS -case typeName if DAY_INTERVAL_TYPES.contains(typeName) => InternalTypes.INTERVAL_MILLIS - -case NULL => - throw new TableException( -"Type NULL is not supported. Null values must have a supported type.") - -// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING -// are represented as Enum -case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]]) - -// extract encapsulated Type -case ANY if relDataType.isInstanceOf[GenericRelDataType] => - val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] - genericRelDataType.genericType - -case ROW if relDataType.isInstanceOf[RowRelDataType] => - val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType] - compositeRelDataType.rowType - -case ROW if relDataType.isInstanceOf[RelRecordType] => - val relRecordType = relDataType.asInstanceOf[RelRecordType] - new RowSchema(relRecordType).internalType - -case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] => - val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType] - multisetRelDataType.multisetType - -case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => - val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] - arrayRelDataType.arrayType - -case MAP if relDataType.isInstanceOf[MapRelDataType] => - val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] - mapRelDataType.mapType + def toLogicalType(relDataType: RelDataType): LogicalType = { +val logicalType = relDataType.getSqlTypeName match { + case BOOLEAN => new BooleanType() + case TINYINT => new TinyIntType() + case SMALLINT => new SmallIntType() + case INTEGER => new IntType() + case BIGINT => new BigIntType() + case FLOAT => new FloatType() + case DOUBLE => new DoubleType() + case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH) Review comment: for VARCHAR, i think we should respect the precision This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r290568189 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Option[Boolean]): RelDataType = { Review comment: Not a good idea to build type from `TableSchema`, and coupled with `isStreaming`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290575541 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290575541 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290575250 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290575205 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290575052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290574817 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. +* +* @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. +*/ + int start() throws IOException; + + /** +* Shutdown the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290574645 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. + * + * When tha task is deployed and the input gates are created, it can happen that not all consumed partitions + * are known at that moment e.g. because their producers have not been started yet. + * Therefore, the {@link ShuffleEnvironment} provides a method {@code updatePartitionInfo} to update them + * externally, ouside of the task thread, when the producer becomes known. + */ +public interface ShuffleEnvironment { + + /** +* Starts the internal related services upon {@link TaskExecutor}'s startup. Review comment: Starts -> Start to keep the same form with the following methods. This is an automated message from the Apache Git Service. To respond to the
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290574532 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. + * + * Input gate management. + * + * The interface implements a factory for the task input gates: {@code createInputGates}. + * The created gates are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the gates lifecycle from that moment. Review comment: gates -> gates' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290574315 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. + * + * Partitions are released in the following cases: + * + * {@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time + * if 'the end of consumption' confirmation is being awaited implicitly + * or the partition is later released by {@code releasePartitions(Collection)}. + * {@code releasePartitions(Collection)} is called outside of the task thread, + * e.g. to manage the local resource lifecycle of external partitions which outlive the task production. + * + * The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}. Review comment: queried -> updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290573784 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. + * + * The interface contains method's to manage the lifecycle of the local shuffle environment: + * + * {@code start} is called when the {@link TaskExecutor} is being started. + * {@code shutdown} is called when the {@link TaskExecutor} is being stopped. + * + * + * Shuffle Input/Output management. + * + * Result partition management. + * + * The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}. + * The created writers are grouped per task and handed over to the task thread upon its startup. + * The task is responsible for the writers lifecycle from that moment. Review comment: writers -> writers' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290573391 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. + * + * Service lifecycle management. Review comment: Remove `Service` here, only `Lifecycle management` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290573231 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data + * and buffers from created here {@link InputGate}s to read it. Review comment: Actually the buffers are not requested from `InputGate` by task, and it is done by network thread. Maybe `and the task can read buffers from created here {@link InputGate}s`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290572726 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. + * The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data Review comment: remove next? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290572426 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. + * + * Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s. Review comment: shuffle service -> shuffle environment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290572212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service locally on task executor. Review comment: shuffle service -> shuffle environment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290572080 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java ## @@ -224,7 +224,7 @@ private static Task createTask( TestStreamTask.class.getName(), taskConfig); - NetworkEnvironment network = new NetworkEnvironmentBuilder().build(); + ShuffleEnvironment network = new NetworkEnvironmentBuilder().build(); Review comment: rename `network` as `shuffleEnvironment` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290572037 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java ## @@ -295,7 +295,7 @@ private Task createTask(Class invokableClass) throw 0, mock(MemoryManager.class), mock(IOManager.class), - networkEnvironment, + shuffleEnvironment, Review comment: indentation issue here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290571983 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -902,7 +902,7 @@ public static Task createTask( PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); - NetworkEnvironment network = new NetworkEnvironmentBuilder().build(); + ShuffleEnvironment network = new NetworkEnvironmentBuilder().build(); Review comment: ditto: rename `network` as `shuffleEnvironment` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9465) Separate timeout for savepoint and checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856306#comment-16856306 ] vinoyang commented on FLINK-9465: - [~kien_truong] About the savepoint timeout config option, I think it would be better set it through API. Currently, it is configured by {{CheckpointConfig#setCheckpointTimeout}}. I plan to export the setter of savepoint's timeout in {{CheckpointConfig}}, then access by {{StreamExecutionEnvrionment#getCheckpointConfig}} . IMO, the configuration file in some deployment env (e.g. Standalone) is global for all jobs. In addition, if we export it through CLI, it will not be conducive to the consistency of behavior. WDYT? > Separate timeout for savepoint and checkpoint > - > > Key: FLINK-9465 > URL: https://issues.apache.org/jira/browse/FLINK-9465 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.5.0 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Major > > Savepoint can take much longer time to perform than checkpoint, especially > with incremental checkpoint enabled. This leads to a couple of troubles: > * For our job, we currently have to set the checkpoint timeout much large > than necessary, otherwise we would be unable to perform savepoint. > * During rush hour, our cluster would encounter high rate of checkpoint > timeout due to backpressure, however we're unable to migrate to a larger > configuration, because savepoint also timeout. > In my opinion, the timeout for savepoint should be configurable separately, > both in the config file and as parameter to the savepoint command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290571947 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java ## @@ -150,7 +150,7 @@ public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); - final NetworkEnvironment networkEnv = new NetworkEnvironmentBuilder().build(); + final ShuffleEnvironment networkEnv = new NetworkEnvironmentBuilder().build(); Review comment: rename `shuffleEnvironment` for `networkEnv` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290571622 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -409,6 +382,7 @@ public void shutdown() { } } + @Override Review comment: ATM this method is only used for testing, so it seems no reasonable to define an interface method only for testing. If the previous test could be refactored not relying on this method. I ever considered this issue to remove this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-12734: --- Description: there are two improvements: 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same instance. (see: {{VolcanoPlanner}}#registerImpl) so, we can use planner instance in {{RelNode}}'s cluster directly instead of planner from {{getVolcanoPlanner}} method in {{FlinkOptimizeContext}}. 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}} In {{RelNodeBlock}}, only {{TableConfig}} is used. was: there are two improvements: 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same instance. (see: {{VolcanoPlanner}}#registerImpl) so, we can use planner instance in {{RelNode}}'s cluster directly instead of {{getVolcanoPlanner}} from {{FlinkOptimizeContext}}. 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}} In {{RelNodeBlock}}, only {{TableConfig}} is used. > remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock > does not depend on TableEnvironment > -- > > Key: FLINK-12734 > URL: https://issues.apache.org/jira/browse/FLINK-12734 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > there are two improvements: > 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. > {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and > the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same > instance. (see: {{VolcanoPlanner}}#registerImpl) > so, we can use planner instance in {{RelNode}}'s cluster directly instead of > planner from {{getVolcanoPlanner}} method in {{FlinkOptimizeContext}}. > 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}} > In {{RelNodeBlock}}, only {{TableConfig}} is used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290571356 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -322,19 +302,10 @@ private void registerInputMetrics(MetricGroup inputGroup, MetricGroup buffersGro buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates)); } - /** -* Update consuming gate with newly available partition. -* -* @param consumerID execution id of consumer to identify belonging to it gate. -* @param partitionInfo telling where the partition can be retrieved from -* @return {@code true} if the partition has been updated or {@code false} if the partition is not available anymore. -* @throws IOException IO problem by the update -* @throws InterruptedException potentially blocking operation was interrupted -* @throws IllegalStateException the input gate with the id from the partitionInfo is not found -*/ + @Override public boolean updatePartitionInfo( - ExecutionAttemptID consumerID, - PartitionInfo partitionInfo) throws IOException, InterruptedException { + ExecutionAttemptID consumerID, Review comment: ditto: keep previous indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8619: [FLINK-12734] [table-planner-blink] remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment
flinkbot commented on issue #8619: [FLINK-12734] [table-planner-blink] remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment URL: https://github.com/apache/flink/pull/8619#issuecomment-498926318 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12734: --- Labels: pull-request-available (was: ) > remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock > does not depend on TableEnvironment > -- > > Key: FLINK-12734 > URL: https://issues.apache.org/jira/browse/FLINK-12734 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > > there are two improvements: > 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. > {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and > the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same > instance. (see: {{VolcanoPlanner}}#registerImpl) > so, we can use planner instance in {{RelNode}}'s cluster directly instead of > {{getVolcanoPlanner}} from {{FlinkOptimizeContext}}. > 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}} > In {{RelNodeBlock}}, only {{TableConfig}} is used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe opened a new pull request #8619: [FLINK-12734] [table-planner-blink] remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvir
godfreyhe opened a new pull request #8619: [FLINK-12734] [table-planner-blink] remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment URL: https://github.com/apache/flink/pull/8619 ## What is the purpose of the change *remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment* ## Brief change log - *remove getVolcanoPlanner method from FlinkOptimizeContext. VolcanoPlanner limits that the planer a RelNode tree belongs to and the VolcanoPlanner used to optimize the RelNode tree should be same instance. (see: VolcanoPlanner#registerImpl) so, we can use planner instance in RelNode's cluster directly instead of getVolcanoPlanner from FlinkOptimizeContext.* - * RelNodeBlock does not depend on TableEnvironment* ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290570744 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -129,11 +128,11 @@ public static NetworkEnvironment fromConfiguration( maxJvmHeapMemory, localTaskManagerCommunication, taskManagerAddress); - return NetworkEnvironment.create(networkConfig, taskEventPublisher, metricGroup, ioManager); + return create(networkConfig, taskEventPublisher, metricGroup, ioManager); } public static NetworkEnvironment create( - NetworkEnvironmentConfiguration config, + NetworkEnvironmentConfiguration config, Review comment: See the changes for commit `[FLINK-11392][network] Introduce ShuffleEnvironment interface`, this indentation should not be changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290570600 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -129,11 +128,11 @@ public static NetworkEnvironment fromConfiguration( maxJvmHeapMemory, localTaskManagerCommunication, taskManagerAddress); - return NetworkEnvironment.create(networkConfig, taskEventPublisher, metricGroup, ioManager); Review comment: This should be done in previous hotfix commit while introducing `fromConfiguration` firstly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12733) Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856296#comment-16856296 ] vinoyang commented on FLINK-12733: -- [~klion26] I think it would be better to discuss and agree on the solution in FLINK-6755 before implementing it. cc [~aljoscha] [~till.rohrmann] > Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12733 > URL: https://issues.apache.org/jira/browse/FLINK-12733 > Project: Flink > Issue Type: Sub-task >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment
godfrey he created FLINK-12734: -- Summary: remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment Key: FLINK-12734 URL: https://issues.apache.org/jira/browse/FLINK-12734 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he there are two improvements: 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same instance. (see: {{VolcanoPlanner}}#registerImpl) so, we can use planner instance in {{RelNode}}'s cluster directly instead of {{getVolcanoPlanner}} from {{FlinkOptimizeContext}}. 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}} In {{RelNodeBlock}}, only {{TableConfig}} is used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#discussion_r290561763 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -388,6 +388,21 @@ public JobSubmissionResult run(FlinkPlan compiledPlan, public abstract CompletableFuture disposeSavepoint(String savepointPath) throws FlinkException; + /** +* Stops a program on Flink cluster whose job-manager is configured in this client's configuration. +* Stopping works only for streaming programs. Be aware, that the program might continue to run for +* a while after sending the stop command, because after sources stopped to emit data ll operators Review comment: ll operators -> all operators This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#discussion_r290567794 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java ## @@ -624,4 +624,58 @@ public void declineCheckpoint(final DeclineCheckpoint decline) { return savepointFuture.thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))); } + + @Override + public CompletableFuture stopWithCheckpoint(boolean advanceToEndOfEventTime) { Review comment: Please check my comments on `CheckpointCoordinator` about preventing duplicated codes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#discussion_r290563567 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ## @@ -188,6 +191,8 @@ PYMODULE_OPTION.setRequired(false); PYMODULE_OPTION.setArgName("pyModule"); + + STOP_WITH_CHECKPOINT.setRequired(false); Review comment: It seems to be better if we could also support `CANCEL_WITH_CHECKPOINT` referring `CANCEL_WITH_SAVEPOINT_OPTION` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#discussion_r290567687 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -424,6 +424,27 @@ public boolean isShutdown() { } } + public CompletableFuture triggerSynchronousCheckpoint( Review comment: It seems to me that we could reuse the existing codes of `triggerSynchronousSavepoint`, adding a flag to indicate whether it's a checkpoint or savepoint. The same applies to `LegacyScheduler#stopWithCheckpoint` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290568555 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -114,6 +116,22 @@ private NetworkEnvironment( this.isShutdown = false; } + public static NetworkEnvironment fromConfiguration( Review comment: It might be better to put this static factory at the end of this class, also for the following `create` method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290568168 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java ## @@ -36,7 +37,7 @@ private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024; /** -* Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, long, InetAddress, boolean)} +* Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, InetAddress)} Review comment: I think this test is only for verifying the generation of `NetworkEnvironmentConfiguration`. The scope of `TaskManagerServicesConfiguration#fromConfiguration` covers many things. Also this class name `TaskManagerServicesConfigurationTest` might also be renamed to `NetworkEnvironmentConfigurationTest` or something else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12662) show jobs failover in history server as well
[ https://issues.apache.org/jira/browse/FLINK-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-12662: Assignee: vinoyang > show jobs failover in history server as well > > > Key: FLINK-12662 > URL: https://issues.apache.org/jira/browse/FLINK-12662 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Su Ralph >Assignee: vinoyang >Priority: Major > > Currently > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/historyserver.html] > only show the completed jobs (completd, cancel, failed). Not showing any > intermediate failover. > Which make the cluster administrator/developer hard to find first place if > there is two failover happens. Feature ask is to > - make a failover as a record in history server as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12662) show jobs failover in history server as well
[ https://issues.apache.org/jira/browse/FLINK-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856289#comment-16856289 ] vinoyang commented on FLINK-12662: -- Thanks [~till.rohrmann] for approving this issue. and thanks [~ralphsu] for raising this valuable issue. I am going to start it. > show jobs failover in history server as well > > > Key: FLINK-12662 > URL: https://issues.apache.org/jira/browse/FLINK-12662 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Su Ralph >Priority: Major > > Currently > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/historyserver.html] > only show the completed jobs (completd, cancel, failed). Not showing any > intermediate failover. > Which make the cluster administrator/developer hard to find first place if > there is two failover happens. Feature ask is to > - make a failover as a record in history server as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290567675 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java ## @@ -205,17 +205,20 @@ private void checkRootDirsClean(File[] rootDirs) { private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration( Configuration config) throws IOException { return TaskManagerServicesConfiguration.fromConfiguration( - config, MEM_SIZE_PARAM, InetAddress.getLocalHost(), true); + config, InetAddress.getLocalHost()); Review comment: separate line for every parameter or we do not need break line here because it might not be long after removing above parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken
lirui-apache commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken URL: https://github.com/apache/flink/pull/8610#issuecomment-498921767 I also prefer to keep HiveShim generic. Actually APIs in HiveShim like `getViews` and `getFunction` take a `IMetaStoreClient` as parameter. So in that sense it's generic in nature. @bowenli86 what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290567322 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -445,36 +442,14 @@ private static NettyConfig createNettyConfig( final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), - getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration); + ConfigurationParserUtils.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration); Review comment: separate line for all the parameters This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290567072 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ## @@ -78,33 +77,32 @@ private Optional systemResourceMetricsProbingInterval; public TaskManagerServicesConfiguration( - InetAddress taskManagerAddress, - String[] tmpDirPaths, - String[] localRecoveryStateRootDirectories, - boolean localRecoveryEnabled, - NetworkEnvironmentConfiguration networkConfig, - @Nullable QueryableStateConfiguration queryableStateConfig, - int numberOfSlots, - long configuredMemory, - MemoryType memoryType, - boolean preAllocateMemory, - float memoryFraction, - long timerServiceShutdownTimeout, - RetryingRegistrationConfiguration retryingRegistrationConfiguration, - Optional systemResourceMetricsProbingInterval) { + InetAddress taskManagerAddress, + String[] tmpDirPaths, + String[] localRecoveryStateRootDirectories, + boolean localRecoveryEnabled, + @Nullable QueryableStateConfiguration queryableStateConfig, + int numberOfSlots, + long configuredMemory, + MemoryType memoryType, + boolean preAllocateMemory, + float memoryFraction, + int pageSize, long timerServiceShutdownTimeout, Review comment: separate line for parameters This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface URL: https://github.com/apache/flink/pull/8608#discussion_r290566895 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ## @@ -78,33 +77,32 @@ private Optional systemResourceMetricsProbingInterval; public TaskManagerServicesConfiguration( - InetAddress taskManagerAddress, - String[] tmpDirPaths, - String[] localRecoveryStateRootDirectories, - boolean localRecoveryEnabled, - NetworkEnvironmentConfiguration networkConfig, - @Nullable QueryableStateConfiguration queryableStateConfig, - int numberOfSlots, - long configuredMemory, - MemoryType memoryType, - boolean preAllocateMemory, - float memoryFraction, - long timerServiceShutdownTimeout, - RetryingRegistrationConfiguration retryingRegistrationConfiguration, - Optional systemResourceMetricsProbingInterval) { + InetAddress taskManagerAddress, Review comment: Keep the previous indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-498920811 One way to my mind: set the scattered `clean()` method with the `TOP_LEVEL` to keep the old way, And set `checkSerializable` to false. All the user function will be passed to clean by `environment#clean()`, we can config the clean level there and checkSerializable at the entrance. ``` @Internal public F clean(F f) { if (getConfig().isClosureCleanerEnabled()) { ClosureCleaner.clean(f, getConfig().getClosureCleanLevel(), true); } ClosureCleaner.ensureSerializable(f); return f; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
Aitozi edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-498756627 I know you mean, and I think it's hard to config the scattered `clean()` used in functions like `connector`, `Pattern`. Some functions can not obtained the `ExecutionConfig`. Do you have some thoughts @aljoscha . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner
JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner URL: https://github.com/apache/flink/pull/8578#discussion_r290563675 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala ## @@ -119,7 +119,9 @@ object FlinkBatchRuleSets { new CoerceInputsRule(classOf[LogicalMinus], false), ConvertToNotInOrInRule.INSTANCE, // optimize limit 0 -FlinkLimit0RemoveRule.INSTANCE +FlinkLimit0RemoveRule.INSTANCE, +// unnest rule +LogicalUnnestRule.INSTANCE Review comment: It seems that `LogicalUnnestRule` is also part of de-correlate. Why not put it in the `DECORRELATE` phase? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner
JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner URL: https://github.com/apache/flink/pull/8578#discussion_r290565276 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -467,6 +467,9 @@ object FlinkTypeFactory { val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] mapRelDataType.mapType +// CURSOR for UDTF case, whose type info will never be used, just a placeholder +case CURSOR => NothingType.INSTANCE Review comment: Just use a `GenericType(NothingTypeInfo)` is ok? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner
JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner URL: https://github.com/apache/flink/pull/8578#discussion_r290564818 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/UnnestITCase.scala ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset +import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestData, TestingAppendSink, TestingRetractSink, TestingRetractTableSink} +import org.apache.flink.types.Row + +import org.junit.Assert.assertEquals +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { Review comment: Can you add some cases to PlanCase and ITCase like: `select , A.field0, A.field1 from T, unnest(c) as A` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi removed a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
Aitozi removed a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-498759646 I think it over again, it can be removed directly. And after this the clean method should not be directly used, it's should use with one entrance `environment#clean()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner
JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner URL: https://github.com/apache/flink/pull/8578#discussion_r290564411 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRuleTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.calcite.CalciteConfig +import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} +import org.apache.flink.table.util.TableTestBase + +import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.tools.RuleSets +import org.junit.{Before, Test} + +import java.sql.Timestamp + +/** + * Test for [[LogicalUnnestRule]]. + */ +class LogicalUnnestRuleTest extends TableTestBase { Review comment: `UnnestTestBase` is not enough? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner
JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner URL: https://github.com/apache/flink/pull/8578#discussion_r290563400 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkDecorrelateProgram.scala ## @@ -77,6 +81,15 @@ class FlinkDecorrelateProgram[OC <: FlinkOptimizeContext] extends FlinkOptimizeP join.getCondition.accept(visitor) super.visit(join) } + + override def visit(other: RelNode): RelNode = { +other match { + // ignore Uncollect's inputs due to the correlate variables are from UNNEST directly, + // not from cases which RelDecorrelator handles + case r: Uncollect => r Review comment: > not from cases which RelDecorrelator handles I don't get it. The real reason should be that `LogicalUnnestRule` is in the `DEFAULT_REWRITE` phase, and the `DEFAULT_REWRITE` phase is behind the `DECORRELATE` phase, so now we need not verify the Unnest correlation? Update comments? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12662) show jobs failover in history server as well
[ https://issues.apache.org/jira/browse/FLINK-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856278#comment-16856278 ] Su Ralph commented on FLINK-12662: -- Thanks [~till.rohrmann]. That's good as well, and probably better. > show jobs failover in history server as well > > > Key: FLINK-12662 > URL: https://issues.apache.org/jira/browse/FLINK-12662 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Su Ralph >Priority: Major > > Currently > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/historyserver.html] > only show the completed jobs (completd, cancel, failed). Not showing any > intermediate failover. > Which make the cluster administrator/developer hard to find first place if > there is two failover happens. Feature ask is to > - make a failover as a record in history server as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9465) Separate timeout for savepoint and checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856276#comment-16856276 ] vinoyang commented on FLINK-9465: - [~till.rohrmann] thanks for approving this issue, I am going to start it. > Separate timeout for savepoint and checkpoint > - > > Key: FLINK-9465 > URL: https://issues.apache.org/jira/browse/FLINK-9465 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.5.0 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Major > > Savepoint can take much longer time to perform than checkpoint, especially > with incremental checkpoint enabled. This leads to a couple of troubles: > * For our job, we currently have to set the checkpoint timeout much large > than necessary, otherwise we would be unable to perform savepoint. > * During rush hour, our cluster would encounter high rate of checkpoint > timeout due to backpressure, however we're unable to migrate to a larger > configuration, because savepoint also timeout. > In my opinion, the timeout for savepoint should be configurable separately, > both in the config file and as parameter to the savepoint command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8550: [FLINK-12401][table] Support incremental emit under AccRetract mode for non-window streaming FlatAggregate on Table API
sunjincheng121 commented on issue #8550: [FLINK-12401][table] Support incremental emit under AccRetract mode for non-window streaming FlatAggregate on Table API URL: https://github.com/apache/flink/pull/8550#issuecomment-498913495 Since the execution mode of the Stream operator has two modes, `ACC` and `ACCRetract`, users can achieve better performance by implementing special interfaces for streaming. The table below is a quick summary. | emitValue | emitUpdateWithRetract | emitUpdateWithoutRetract -- | -- | -- | -- ACC | Y | N | Y ACCRetract | Y | Y | N -emitValue - for batch and streaming. -eimitUpdateWithRetract - only for streaming in ACC mode. -emitUpdateWithoutRetract - only for streaming in ACCRetract mode(need key definition on TableAggregateFunction, [under discussion](https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit#heading=h.evvcpnbn30wn)). So, In this PR, change the method name from `emitRetractValueIncrementally` to `emitUpdateWithRetract` is better. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#issuecomment-498912234 @flinkbot approve description @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on issue #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on issue #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#issuecomment-498911292 @flinkbot approve description @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12709) Implement RestartBackoffTimeStrategyFactoryLoader
[ https://issues.apache.org/jira/browse/FLINK-12709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856258#comment-16856258 ] Zhu Zhu commented on FLINK-12709: - Thanks [~rmetzger]. Sorry for missing setting the related components. Will be careful in the future. > Implement RestartBackoffTimeStrategyFactoryLoader > - > > Key: FLINK-12709 > URL: https://issues.apache.org/jira/browse/FLINK-12709 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Fix For: 1.9.0 > > > We need to implement a RestartBackoffTimeStrategyFactoryLoader to instantiate > RestartBackoffTimeStrategyFactory. > In order to be backwards compatible, the loader is responsible for converting > *RestartStrategy* > configurations([https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html])and > *RestartStrategyConfiguration* to latest *RestartBackoffTimeStrategy* > configurations. > The converted configurations will be used to create > *RestartBackoffTimeStrategy.Factory* via > *RestartBackoffTimeStrategy#createFactory(Configuration)*. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#discussion_r290559979 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ## @@ -515,6 +513,12 @@ protected void stop(String[] args) throws Exception { return; } + if (stopOptions.hasCheckpointFlag()) { + LOG.info("Running 'stop-with-savepoint' command."); Review comment: here it should be "stop-with-checkpoint" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856241#comment-16856241 ] BoWang commented on FLINK-12608: OK, thanks [~rmetzger]. > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Affects Version/s: 1.9.0 > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology
[ https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BoWang updated FLINK-12608: --- Fix Version/s: 1.9.0 > Add > getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) > to SchedulingTopology > --- > > Key: FLINK-12608 > URL: https://issues.apache.org/jira/browse/FLINK-12608 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in > [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], > need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12733) Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint
Congxian Qiu(klion26) created FLINK-12733: - Summary: Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint Key: FLINK-12733 URL: https://issues.apache.org/jira/browse/FLINK-12733 Project: Flink Issue Type: Sub-task Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog
xuefuz commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog URL: https://github.com/apache/flink/pull/8616#issuecomment-498877524 Thanks for the contribution. I have a couple of high-level comments: 1. The PR seems suggesting that hive-site.xml path is always required, which shouldn't be the case. We should just instantiate a HiveConf instance and get the config dir from the environment. 2. There seems to be a good number of constructors. We can probably reduce it by only providing Catalog(String catName, String dbName, String/URL hiveSiteLocation) and taking care of nulls. We can add more on demand later on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set
[ https://issues.apache.org/jira/browse/FLINK-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856145#comment-16856145 ] Cesar Alvernaz edited comment on FLINK-12383 at 6/4/19 9:56 PM: I could reproduce this issue following the instruction from [here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]. Since the `log.file` property is configured from a system property this warning can be fixed by setting the environment to add job manager-specific JVM options {{ env: - name: FLINK_ENV_JAVA_OPTS value: -Dlog.file=/tmp/log }} was (Author: calvernaz): I could reproduce this issue following the instruction from [here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]. Since the `log.file` property is configured from a system property this warning can be fixed by setting the environment to add job manager-specific JVM options ``` env: - name: FLINK_ENV_JAVA_OPTS value: -Dlog.file=/tmp/log ``` > "Log file environment variable 'log.file' is not set" despite web.log.path > being set > > > Key: FLINK-12383 > URL: https://issues.apache.org/jira/browse/FLINK-12383 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.8.0 >Reporter: Henrik >Priority: Minor > > You get these warnings when starting a session cluster, despite having > configured all things logs as specified by the configuration reference on the > [web > site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]: > {code:java} > [job] 2019-05-01 13:25:35,418 WARN > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file > environment variable 'log.file' is not set. > [job] 2019-05-01 13:25:35,419 INFO > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined > location of main cluster component log file: /var/lib/log/flink/jobmanager.log > [job] 2019-05-01 13:25:35,419 INFO > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined > location of main cluster component stdout file: > /var/lib/log/flink/jobmanager.out > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set
[ https://issues.apache.org/jira/browse/FLINK-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856145#comment-16856145 ] Cesar Alvernaz edited comment on FLINK-12383 at 6/4/19 9:56 PM: {quote}I could reproduce this issue following the instruction from [here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]. Since the `log.file` property is configured from a system property this warning can be fixed by setting the environment to add job manager-specific JVM options {{env: - name: FLINK_ENV_JAVA_OPTS value: -Dlog.file=/tmp/log}} was (Author: calvernaz): I could reproduce this issue following the instruction from [here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]. Since the `log.file` property is configured from a system property this warning can be fixed by setting the environment to add job manager-specific JVM options {{ env: - name: FLINK_ENV_JAVA_OPTS value: -Dlog.file=/tmp/log }} > "Log file environment variable 'log.file' is not set" despite web.log.path > being set > > > Key: FLINK-12383 > URL: https://issues.apache.org/jira/browse/FLINK-12383 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.8.0 >Reporter: Henrik >Priority: Minor > > You get these warnings when starting a session cluster, despite having > configured all things logs as specified by the configuration reference on the > [web > site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]: > {code:java} > [job] 2019-05-01 13:25:35,418 WARN > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file > environment variable 'log.file' is not set. > [job] 2019-05-01 13:25:35,419 INFO > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined > location of main cluster component log file: /var/lib/log/flink/jobmanager.log > [job] 2019-05-01 13:25:35,419 INFO > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined > location of main cluster component stdout file: > /var/lib/log/flink/jobmanager.out > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set
[ https://issues.apache.org/jira/browse/FLINK-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856145#comment-16856145 ] Cesar Alvernaz commented on FLINK-12383: I could reproduce this issue following the instruction from [here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]. Since the `log.file` property is configured from a system property this warning can be fixed by setting the environment to add job manager-specific JVM options ``` env: - name: FLINK_ENV_JAVA_OPTS value: -Dlog.file=/tmp/log ``` > "Log file environment variable 'log.file' is not set" despite web.log.path > being set > > > Key: FLINK-12383 > URL: https://issues.apache.org/jira/browse/FLINK-12383 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.8.0 >Reporter: Henrik >Priority: Minor > > You get these warnings when starting a session cluster, despite having > configured all things logs as specified by the configuration reference on the > [web > site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]: > {code:java} > [job] 2019-05-01 13:25:35,418 WARN > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file > environment variable 'log.file' is not set. > [job] 2019-05-01 13:25:35,419 INFO > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined > location of main cluster component log file: /var/lib/log/flink/jobmanager.log > [job] 2019-05-01 13:25:35,419 INFO > org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined > location of main cluster component stdout file: > /var/lib/log/flink/jobmanager.out > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8589: [FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for HiveCatalog
bowenli86 commented on issue #8589: [FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for HiveCatalog URL: https://github.com/apache/flink/pull/8589#issuecomment-498849277 Rebased this PR to https://github.com/apache/flink/pull/8616 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8589: [FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for HiveCatalog
bowenli86 commented on a change in pull request #8589: [FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for HiveCatalog URL: https://github.com/apache/flink/pull/8589#discussion_r289981359 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.descriptors; + +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.descriptors.CatalogDescriptor; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import java.util.Map; + +import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_PROPERTOES_HIVE_METASTORE_URIS; +import static org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE; + +/** + * Catalog descriptor for {@link HiveCatalog}. + */ +public class HiveCatalogDescriptor extends CatalogDescriptor { + + private String hiveMetastoreUris; + + // TODO : set default database + public HiveCatalogDescriptor(String hiveMetastoreUris) { + super(CATALOG_TYPE_VALUE_HIVE, 1); + + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreUris)); + this.hiveMetastoreUris = hiveMetastoreUris; Review comment: I'm not sure about using environment vars because it means the yaml config file is not the single source of truth anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog
bowenli86 commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog URL: https://github.com/apache/flink/pull/8616#issuecomment-498847563 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken
xuefuz commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken URL: https://github.com/apache/flink/pull/8610#issuecomment-498782835 > I wonder that, since`moveToTrash()` is mainly a util, should we move it to a separate util shim class like `HiveUtilShim`? The current `HiveShim` class, as its javadoc says, is mainly for Hive Metastore. We probably should rename it to `HiveClientShim`? > > LGTM otherwise. HiveShim is quite generic in naming. It's a shim, so in theory anything can be put there. Maybe we just need to update the javadoc to reflect the new addition. Having another shim for utils might be an overkill for now. However, we can keep an eye on this and see how we organize better when this layer gets too heavy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
sjwiesman commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#issuecomment-498778322 @flinkbot attention @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
flinkbot commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#issuecomment-498777998 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12732) Add savepoint reader for consuming partitioned operator state
[ https://issues.apache.org/jira/browse/FLINK-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12732: --- Labels: pull-request-available (was: ) > Add savepoint reader for consuming partitioned operator state > - > > Key: FLINK-12732 > URL: https://issues.apache.org/jira/browse/FLINK-12732 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Runtime / State Backends >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 removed a comment on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog
bowenli86 removed a comment on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog URL: https://github.com/apache/flink/pull/8616#issuecomment-498773580 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman opened a new pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
sjwiesman opened a new pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618 ## What is the purpose of the change This is the second PR for FLIP-43 adding the functionality to read partitioned operator state from a state snapshot. It is based on #8615 and only the final 4 commits should be reviewed. ## Brief change log 62dd050 - Some basic refactoring, this pulls the `NeverCompleteFuture` into a reusable utility 38eaa66 - Implements a shim `Environment` on-top of the available user `RuntimeContext` 9689001 - Adds the new functionality to read keyed state out of a snapshot be0552c - Adds new documentation ## Verifying this change *(Please pick either of the following options)* This change added tests and can be verified as follows: UT and IT tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
asfgit closed pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables URL: https://github.com/apache/flink/pull/8522 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12572) Implement HiveInputFormat to read Hive tables
[ https://issues.apache.org/jira/browse/FLINK-12572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12572. Resolution: Fixed merged in 1.9.0: 38557bf8a6f8bebef8733f3f4f3b3950e9678fca > Implement HiveInputFormat to read Hive tables > - > > Key: FLINK-12572 > URL: https://issues.apache.org/jira/browse/FLINK-12572 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / Ecosystem >Affects Versions: 1.9.0 >Reporter: zjuwangg >Assignee: zjuwangg >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)