[GitHub] [flink] lirui-apache commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog

2019-06-04 Thread GitBox
lirui-apache commented on issue #8616: [FLINK-12718][hive] allow users to 
specify hive-site.xml location to configure hive metastore client in HiveCatalog
URL: https://github.com/apache/flink/pull/8616#issuecomment-498946674
 
 
   LGTM, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290584736
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 
 Review comment:
   The original intention here is to convert all time indicators to regular 
`TimestampType` when it is batch mode.
   You mean this conversion should be outside?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290584058
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 buildRelDataType(
   tableSchema.getFieldNames.toSeq,
-  tableSchema.getFieldTypes map {
-case _: TimeIndicatorTypeInfo if isStreaming.isDefined && 
!isStreaming.get =>
-  InternalTypes.TIMESTAMP
-case tpe: TypeInformation[_] => createInternalTypeFromTypeInfo(tpe)
+  tableSchema.getFieldDataTypes.map(_.getLogicalType).map {
+case t: TimestampType if isStreaming.isDefined && !isStreaming.get =>
+  if (t.getKind == TimestampKind.REGULAR) t else new TimestampType(3)
+case t => t
   })
   }
 
   def buildRelDataType(
 
 Review comment:
   I'll change name to `buildLogicalRowType`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290582852
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
+  case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH)
+  case DECIMAL => new DecimalType(relDataType.getPrecision, 
relDataType.getScale)
+
+  // time indicators
+  case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
+val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
+if (indicator.isEventTime) {
+  new TimestampType(true, TimestampKind.ROWTIME, 3)
+} else {
+  new TimestampType(true, TimestampKind.PROCTIME, 3)
+}
 
-case _@t =>
-  throw new TableException(s"Type is not supported: $t")
+  // temporal types
+  case DATE => new DateType()
+  case TIME => new TimeType()
+  case TIMESTAMP => new TimestampType(3)
+  case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
+DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
+  case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
+DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
+
+  case NULL =>
+throw new TableException(
+  "Type NULL is not supported. Null values must have a supported 
type.")
+
+  // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+  // are represented as Enum
+  case SYMBOL => 
toPlannerLogicalType(PlannerTypeConversions.toDataType(classOf[Enum[_]]))
+
+  // extract encapsulated Type
+  case ANY if 

[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290579108
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java
 ##
 @@ -47,7 +44,7 @@
private UnresolvedReferenceExpression sum = new 
UnresolvedReferenceExpression("sum");
private UnresolvedReferenceExpression count = new 
UnresolvedReferenceExpression("count");
 
-   public abstract TypeInformation getSumType();
+   public abstract DataType getSumType();
 
 Review comment:
   Look at @twalthr 's response.
   
   >The user should not be confronted to two different type stacks. That's why 
the description in DataType calls it "hints", a user should always use 
DataType. If he adds a hint where it is not appropriate, we just don't need to 
use those hints.
   
   In @wuchong 's first thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r290578406
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ##
 @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.graph.{StreamGraph, StreamGraphGenerator}
 import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.PlannerConfigImpl
 
 Review comment:
   If TableEnv is unified, we actually can't see `PlannerConfigImpl`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290577508
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -41,87 +40,112 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
-  * Flink specific type factory that represents the interface between Flink's 
[[InternalType]]
+  * Flink specific type factory that represents the interface between Flink's 
[[LogicalType]]
   * and Calcite's [[RelDataType]].
   */
 class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImpl(typeSystem) {
 
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[(InternalType, Boolean), 
RelDataType]()
+  private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
 
-  def createTypeFromInternalType(
-  tp: InternalType,
-  isNullable: Boolean): RelDataType = {
+  def createTypeFromLogicalType(t: LogicalType, isNullable: Boolean): 
RelDataType = {
 
 Review comment:
   OK, I'll replace it with `createTypeFromLogicalType(t.copy(nullable))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290576941
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ##
 @@ -150,7 +150,7 @@ public void 
testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E
 
final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
 
-   final ShuffleEnvironment networkEnv = new 
NetworkEnvironmentBuilder().build();
+   final ShuffleEnvironment networkEnv = new 
NettyShuffleEnvironmentBuilder().build();
 
 Review comment:
   networkEnv -> shuffleEnvironment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290576205
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290565250
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -41,87 +40,112 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
-  * Flink specific type factory that represents the interface between Flink's 
[[InternalType]]
+  * Flink specific type factory that represents the interface between Flink's 
[[LogicalType]]
   * and Calcite's [[RelDataType]].
   */
 class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImpl(typeSystem) {
 
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[(InternalType, Boolean), 
RelDataType]()
+  private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
 
-  def createTypeFromInternalType(
-  tp: InternalType,
-  isNullable: Boolean): RelDataType = {
+  def createTypeFromLogicalType(t: LogicalType, isNullable: Boolean): 
RelDataType = {
 
 Review comment:
   Since `LogicalType` already contains nullability information, i suggest we 
drop this interface, to force all the caller pass nullable through 
`LogicalType`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290575834
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java
 ##
 @@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
+import org.apache.flink.table.typeutils.BinaryArraySerializer;
+import org.apache.flink.table.typeutils.BinaryGenericSerializer;
+import org.apache.flink.table.typeutils.BinaryMapSerializer;
+import org.apache.flink.table.typeutils.BinaryStringSerializer;
+import org.apache.flink.table.typeutils.DecimalSerializer;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT;
+
+/**
+ * Utilities for {@link LogicalType} and {@link DataType}..
+ */
+public class PlannerTypeUtils {
 
 Review comment:
   Also a mix of multiple convert methods with unclean purpose. We can at least 
strip off conversion between `LogicalType` and `Class` to another dedicate 
class and explain when this kind of conversion will happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290575554
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeConversions.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.MultisetTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.table.typeutils.BigDecimalTypeInfo;
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
+import org.apache.flink.table.typeutils.DecimalTypeInfo;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Converters for {@link LogicalType} and {@link DataType}.
+ */
+public class PlannerTypeConversions {
 
 Review comment:
   this class mixed conversion between `LogicalType`, `DataType` and 
`TypeInformation`. It's better to split them into dedicated classes, and for 
each type of conversion, explain what scenarios would this conversion will 
happen. For example, when we need to convert `LogicalType` to `TypeInformation`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290569286
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java
 ##
 @@ -47,7 +44,7 @@
private UnresolvedReferenceExpression sum = new 
UnresolvedReferenceExpression("sum");
private UnresolvedReferenceExpression count = new 
UnresolvedReferenceExpression("count");
 
-   public abstract TypeInformation getSumType();
+   public abstract DataType getSumType();
 
 Review comment:
   Why don't use `LogicalType` here? Do we want to support 
`DeclarativeAggregateFunction` to choose physical storage for acc buffer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290567350
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
 ##
 @@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.flink.table.`type`.ArrayType
+import org.apache.flink.table.types.logical.ArrayType
 
 Review comment:
   Do we still need to keep our custom `ArrayRelDataType` ? How about directly 
use Calcite's `ArraySqlType`? Same question to `MapRelDataType`,  
`MultisetRelDataType`  and `RowRelDataType` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568068
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -164,19 +188,19 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 * using FlinkTypeFactory
 *
 * @param fieldNames field names
-* @param fieldTypes field types, every element is Flink's 
[[InternalType]]
+* @param fieldTypes field types, every element is Flink's 
[[LogicalType]]
 * @param fieldNullables field nullable properties
 * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
 */
   def buildLogicalRowType(
   fieldNames: Seq[String],
-  fieldTypes: Seq[InternalType],
+  fieldTypes: Seq[LogicalType],
   fieldNullables: Seq[Boolean]): RelDataType = {
 
 Review comment:
   Duplicate nullable information, `LogicalType` already contains such 
information. And i think `buildLogicalRowType` is also necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568801
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
+  case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH)
 
 Review comment:
   why precision dropped and use max length?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568259
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 buildRelDataType(
   tableSchema.getFieldNames.toSeq,
-  tableSchema.getFieldTypes map {
-case _: TimeIndicatorTypeInfo if isStreaming.isDefined && 
!isStreaming.get =>
-  InternalTypes.TIMESTAMP
-case tpe: TypeInformation[_] => createInternalTypeFromTypeInfo(tpe)
+  tableSchema.getFieldDataTypes.map(_.getLogicalType).map {
+case t: TimestampType if isStreaming.isDefined && !isStreaming.get =>
+  if (t.getKind == TimestampKind.REGULAR) t else new TimestampType(3)
+case t => t
   })
   }
 
   def buildRelDataType(
 
 Review comment:
   Unclear purpose about this interface


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568669
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   I remember you want disable all CHAR support


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568945
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
+  case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH)
+  case DECIMAL => new DecimalType(relDataType.getPrecision, 
relDataType.getScale)
+
+  // time indicators
+  case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
+val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
+if (indicator.isEventTime) {
+  new TimestampType(true, TimestampKind.ROWTIME, 3)
+} else {
+  new TimestampType(true, TimestampKind.PROCTIME, 3)
+}
 
-case _@t =>
-  throw new TableException(s"Type is not supported: $t")
+  // temporal types
+  case DATE => new DateType()
+  case TIME => new TimeType()
+  case TIMESTAMP => new TimestampType(3)
+  case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
+DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
+  case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
+DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
+
+  case NULL =>
+throw new TableException(
+  "Type NULL is not supported. Null values must have a supported 
type.")
+
+  // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+  // are represented as Enum
+  case SYMBOL => 
toPlannerLogicalType(PlannerTypeConversions.toDataType(classOf[Enum[_]]))
+
+  // extract encapsulated Type
+  case ANY if 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290575983
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290567916
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -147,12 +171,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
 *
 * @param fieldNames field names
-* @param fieldTypes field types, every element is Flink's [[InternalType]]
+* @param fieldTypes field types, every element is Flink's [[LogicalType]]
 * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
 */
   def buildLogicalRowType(
 
 Review comment:
   Why not just using `createTypeFromLogicalType` and pass in a `RowType`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568761
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -406,78 +416,76 @@ object FlinkTypeFactory {
 case _ => false
   }
 
-  def toInternalType(relDataType: RelDataType): InternalType = 
relDataType.getSqlTypeName match {
-case BOOLEAN => InternalTypes.BOOLEAN
-case TINYINT => InternalTypes.BYTE
-case SMALLINT => InternalTypes.SHORT
-case INTEGER => InternalTypes.INT
-case BIGINT => InternalTypes.LONG
-case FLOAT => InternalTypes.FLOAT
-case DOUBLE => InternalTypes.DOUBLE
-case VARCHAR | CHAR => InternalTypes.STRING
-case VARBINARY | BINARY => InternalTypes.BINARY
-case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-// time indicators
-case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-  val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-  if (indicator.isEventTime) {
-InternalTypes.ROWTIME_INDICATOR
-  } else {
-InternalTypes.PROCTIME_INDICATOR
-  }
-
-// temporal types
-case DATE => InternalTypes.DATE
-case TIME => InternalTypes.TIME
-case TIMESTAMP => InternalTypes.TIMESTAMP
-case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MONTHS
-case typeName if DAY_INTERVAL_TYPES.contains(typeName) => 
InternalTypes.INTERVAL_MILLIS
-
-case NULL =>
-  throw new TableException(
-"Type NULL is not supported. Null values must have a supported type.")
-
-// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-// are represented as Enum
-case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
-// extract encapsulated Type
-case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-  val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-  genericRelDataType.genericType
-
-case ROW if relDataType.isInstanceOf[RowRelDataType] =>
-  val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
-  compositeRelDataType.rowType
-
-case ROW if relDataType.isInstanceOf[RelRecordType] =>
-  val relRecordType = relDataType.asInstanceOf[RelRecordType]
-  new RowSchema(relRecordType).internalType
-
-case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
-  val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
-  multisetRelDataType.multisetType
-
-case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-  val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-  arrayRelDataType.arrayType
-
-case MAP if relDataType.isInstanceOf[MapRelDataType] =>
-  val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
-  mapRelDataType.mapType
+  def toLogicalType(relDataType: RelDataType): LogicalType = {
+val logicalType = relDataType.getSqlTypeName match {
+  case BOOLEAN => new BooleanType()
+  case TINYINT => new TinyIntType()
+  case SMALLINT => new SmallIntType()
+  case INTEGER => new IntType()
+  case BIGINT => new BigIntType()
+  case FLOAT => new FloatType()
+  case DOUBLE => new DoubleType()
+  case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
 
 Review comment:
   for VARCHAR, i think we should respect the precision 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-04 Thread GitBox
KurtYoung commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290568189
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -189,35 +213,21 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def buildLogicalRowType(tableSchema: TableSchema, isStreaming: 
Option[Boolean]): RelDataType = {
 
 Review comment:
   Not a good idea to build type from `TableSchema`, and coupled with 
`isStreaming`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290575541
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290575541
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290575250
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290575205
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290575052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290574817
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Shutdown the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290574645
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
+ *
+ * When tha task is deployed and the input gates are created, it can happen 
that not all consumed partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@code 
updatePartitionInfo} to update them
+ * externally, ouside of the task thread, when the producer becomes known.
+ */
+public interface ShuffleEnvironment {
+
+   /**
+* Starts the internal related services upon {@link TaskExecutor}'s 
startup.
 
 Review comment:
   Starts -> Start to keep the same form with the following methods.


This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290574532
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
+ *
+ * Input gate management.
+ *
+ * The interface implements a factory for the task input gates: {@code 
createInputGates}.
+ * The created gates are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the gates lifecycle from that moment.
 
 Review comment:
   gates -> gates'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290574315
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
+ *
+ * Partitions are released in the following cases:
+ * 
+ * {@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production has failed.
+ * {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ * if the production is done. The actual release can take some time
+ * if 'the end of consumption' confirmation is being awaited implicitly
+ * or the partition is later released by {@code 
releasePartitions(Collection)}.
+ * {@code releasePartitions(Collection)} is called 
outside of the task thread,
+ * e.g. to manage the local resource lifecycle of external partitions 
which outlive the task production.
+ * 
+ * The partitions, which currently still occupy local resources, can be 
queried with {@code updatePartitionInfo}.
 
 Review comment:
   queried -> updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290573784
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
+ *
+ * The interface contains method's to manage the lifecycle of the local 
shuffle environment:
+ * 
+ * {@code start} is called when the {@link TaskExecutor} is being 
started.
+ * {@code shutdown} is called when the {@link TaskExecutor} is being 
stopped.
+ * 
+ *
+ * Shuffle Input/Output management.
+ *
+ * Result partition management.
+ *
+ * The interface implements a factory of result partition writers for the 
task output: {@code createResultPartitionWriters}.
+ * The created writers are grouped per task and handed over to the task thread 
upon its startup.
+ * The task is responsible for the writers lifecycle from that moment.
 
 Review comment:
   writers -> writers'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290573391
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
+ *
+ * Service lifecycle management.
 
 Review comment:
   Remove `Service` here, only `Lifecycle management`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290573231
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
+ * and buffers from created here {@link InputGate}s to read it.
 
 Review comment:
   Actually the buffers are not requested from `InputGate` by task, and it is 
done by network thread.
   Maybe `and the task can read buffers from created here {@link InputGate}s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290572726
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
+ * The task can request next available memory buffers from created here {@link 
ResultPartitionWriter}s to write shuffle data
 
 Review comment:
   remove next?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290572426
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
+ *
+ * Input/Output interface of local shuffle service is based on memory 
{@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
 
 Review comment:
   shuffle service -> shuffle environment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290572212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ShuffleEnvironment.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service locally on task 
executor.
 
 Review comment:
   shuffle service -> shuffle environment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290572080
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 ##
 @@ -224,7 +224,7 @@ private static Task createTask(
TestStreamTask.class.getName(),
taskConfig);
 
-   NetworkEnvironment network = new 
NetworkEnvironmentBuilder().build();
+   ShuffleEnvironment network = new 
NetworkEnvironmentBuilder().build();
 
 Review comment:
   rename `network` as `shuffleEnvironment`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290572037
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 ##
 @@ -295,7 +295,7 @@ private Task createTask(Class 
invokableClass) throw
0,
mock(MemoryManager.class),
mock(IOManager.class),
-   networkEnvironment,
+   shuffleEnvironment,
 
 Review comment:
   indentation issue here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290571983
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -902,7 +902,7 @@ public static Task createTask(
PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
 
-   NetworkEnvironment network = new 
NetworkEnvironmentBuilder().build();
+   ShuffleEnvironment network = new 
NetworkEnvironmentBuilder().build();
 
 Review comment:
   ditto: rename `network` as `shuffleEnvironment`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9465) Separate timeout for savepoint and checkpoint

2019-06-04 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856306#comment-16856306
 ] 

vinoyang commented on FLINK-9465:
-

[~kien_truong] About the savepoint timeout config option, I think it would be 
better set it through API. Currently, it is configured by 
{{CheckpointConfig#setCheckpointTimeout}}. I plan to export the setter of 
savepoint's timeout in {{CheckpointConfig}}, then access by 
{{StreamExecutionEnvrionment#getCheckpointConfig}} . IMO, the configuration 
file in some deployment env (e.g. Standalone) is global for all jobs. In 
addition, if we export it through CLI, it will not be conducive to the 
consistency of behavior. WDYT?

> Separate timeout for savepoint and checkpoint
> -
>
> Key: FLINK-9465
> URL: https://issues.apache.org/jira/browse/FLINK-9465
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
>
> Savepoint can take much longer time to perform than checkpoint, especially 
> with incremental checkpoint enabled. This leads to a couple of troubles:
>  * For our job, we currently have to set the checkpoint timeout much large 
> than necessary, otherwise we would be unable to perform savepoint. 
>  * During rush hour, our cluster would encounter high rate of checkpoint 
> timeout due to backpressure, however we're unable to migrate to a larger 
> configuration, because savepoint also timeout.
> In my opinion, the timeout for savepoint should be configurable separately, 
> both in the config file and as parameter to the savepoint command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290571947
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ##
 @@ -150,7 +150,7 @@ public void 
testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E
 
final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
 
-   final NetworkEnvironment networkEnv = new 
NetworkEnvironmentBuilder().build();
+   final ShuffleEnvironment networkEnv = new 
NetworkEnvironmentBuilder().build();
 
 Review comment:
   rename `shuffleEnvironment` for `networkEnv`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290571622
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -409,6 +382,7 @@ public void shutdown() {
}
}
 
+   @Override
 
 Review comment:
   ATM this method is only used for testing, so it seems no reasonable to 
define an interface method only for testing. If the previous test could be 
refactored not relying on this method. I ever considered this issue to remove 
this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment

2019-06-04 Thread godfrey he (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-12734:
---
Description: 
there are two improvements:
1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. 
{{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the 
{{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same 
instance. (see: {{VolcanoPlanner}}#registerImpl)
so, we can use planner instance in {{RelNode}}'s cluster directly instead of 
planner from {{getVolcanoPlanner}} method in {{FlinkOptimizeContext}}.

2. {{RelNodeBlock}} does not depend on {{TableEnvironment}}
In {{RelNodeBlock}}, only {{TableConfig}} is used.



  was:
there are two improvements:
1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. 
{{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the 
{{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same 
instance. (see: {{VolcanoPlanner}}#registerImpl)
so, we can use planner instance in {{RelNode}}'s cluster directly instead of 
{{getVolcanoPlanner}} from {{FlinkOptimizeContext}}.

2. {{RelNodeBlock}} does not depend on {{TableEnvironment}}
In {{RelNodeBlock}}, only {{TableConfig}} is used.




> remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock 
> does not depend on TableEnvironment
> --
>
> Key: FLINK-12734
> URL: https://issues.apache.org/jira/browse/FLINK-12734
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> there are two improvements:
> 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. 
> {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and 
> the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same 
> instance. (see: {{VolcanoPlanner}}#registerImpl)
> so, we can use planner instance in {{RelNode}}'s cluster directly instead of 
> planner from {{getVolcanoPlanner}} method in {{FlinkOptimizeContext}}.
> 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}}
> In {{RelNodeBlock}}, only {{TableConfig}} is used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290571356
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -322,19 +302,10 @@ private void registerInputMetrics(MetricGroup 
inputGroup, MetricGroup buffersGro
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
}
 
-   /**
-* Update consuming gate with newly available partition.
-*
-* @param consumerID execution id of consumer to identify belonging to 
it gate.
-* @param partitionInfo telling where the partition can be retrieved 
from
-* @return {@code true} if the partition has been updated or {@code 
false} if the partition is not available anymore.
-* @throws IOException IO problem by the update
-* @throws InterruptedException potentially blocking operation was 
interrupted
-* @throws IllegalStateException the input gate with the id from the 
partitionInfo is not found
-*/
+   @Override
public boolean updatePartitionInfo(
-   ExecutionAttemptID consumerID,
-   PartitionInfo partitionInfo) throws IOException, 
InterruptedException {
+   ExecutionAttemptID consumerID,
 
 Review comment:
   ditto: keep previous indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8619: [FLINK-12734] [table-planner-blink] remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment

2019-06-04 Thread GitBox
flinkbot commented on issue #8619: [FLINK-12734] [table-planner-blink] remove 
getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not 
depend on TableEnvironment
URL: https://github.com/apache/flink/pull/8619#issuecomment-498926318
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment

2019-06-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12734:
---
Labels: pull-request-available  (was: )

> remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock 
> does not depend on TableEnvironment
> --
>
> Key: FLINK-12734
> URL: https://issues.apache.org/jira/browse/FLINK-12734
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
>
> there are two improvements:
> 1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. 
> {{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and 
> the {{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same 
> instance. (see: {{VolcanoPlanner}}#registerImpl)
> so, we can use planner instance in {{RelNode}}'s cluster directly instead of 
> {{getVolcanoPlanner}} from {{FlinkOptimizeContext}}.
> 2. {{RelNodeBlock}} does not depend on {{TableEnvironment}}
> In {{RelNodeBlock}}, only {{TableConfig}} is used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe opened a new pull request #8619: [FLINK-12734] [table-planner-blink] remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvir

2019-06-04 Thread GitBox
godfreyhe opened a new pull request #8619: [FLINK-12734] [table-planner-blink] 
remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does 
not depend on TableEnvironment
URL: https://github.com/apache/flink/pull/8619
 
 
   
   ## What is the purpose of the change
   
   *remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock 
does not depend on TableEnvironment*
   
   
   ## Brief change log
   
 - *remove getVolcanoPlanner method from FlinkOptimizeContext.
   VolcanoPlanner limits that the planer a RelNode tree belongs to and the 
VolcanoPlanner used to optimize the RelNode tree should be same instance. (see: 
VolcanoPlanner#registerImpl)
   so, we can use planner instance in RelNode's cluster directly instead of 
getVolcanoPlanner from FlinkOptimizeContext.*
 - * RelNodeBlock does not depend on TableEnvironment*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290570744
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -129,11 +128,11 @@ public static NetworkEnvironment fromConfiguration(
maxJvmHeapMemory,
localTaskManagerCommunication,
taskManagerAddress);
-   return NetworkEnvironment.create(networkConfig, 
taskEventPublisher, metricGroup, ioManager);
+   return create(networkConfig, taskEventPublisher, metricGroup, 
ioManager);
}
 
public static NetworkEnvironment create(
-   NetworkEnvironmentConfiguration config,
+   NetworkEnvironmentConfiguration config,
 
 Review comment:
   See the changes for commit `[FLINK-11392][network] Introduce 
ShuffleEnvironment interface`, this indentation should not be changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290570600
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -129,11 +128,11 @@ public static NetworkEnvironment fromConfiguration(
maxJvmHeapMemory,
localTaskManagerCommunication,
taskManagerAddress);
-   return NetworkEnvironment.create(networkConfig, 
taskEventPublisher, metricGroup, ioManager);
 
 Review comment:
   This should be done in previous hotfix commit while introducing 
`fromConfiguration` firstly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12733) Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856296#comment-16856296
 ] 

vinoyang commented on FLINK-12733:
--

[~klion26] I think it would be better to discuss and agree on the solution in 
FLINK-6755 before implementing it. cc [~aljoscha] [~till.rohrmann]

> Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12733
> URL: https://issues.apache.org/jira/browse/FLINK-12733
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment

2019-06-04 Thread godfrey he (JIRA)
godfrey he created FLINK-12734:
--

 Summary: remove getVolcanoPlanner method from FlinkOptimizeContext 
and RelNodeBlock does not depend on TableEnvironment
 Key: FLINK-12734
 URL: https://issues.apache.org/jira/browse/FLINK-12734
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


there are two improvements:
1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. 
{{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the 
{{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same 
instance. (see: {{VolcanoPlanner}}#registerImpl)
so, we can use planner instance in {{RelNode}}'s cluster directly instead of 
{{getVolcanoPlanner}} from {{FlinkOptimizeContext}}.

2. {{RelNodeBlock}} does not depend on {{TableEnvironment}}
In {{RelNodeBlock}}, only {{TableConfig}} is used.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread GitBox
carp84 commented on a change in pull request #8617: 
[FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#discussion_r290561763
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -388,6 +388,21 @@ public JobSubmissionResult run(FlinkPlan compiledPlan,
 
public abstract CompletableFuture disposeSavepoint(String 
savepointPath) throws FlinkException;
 
+   /**
+* Stops a program on Flink cluster whose job-manager is configured in 
this client's configuration.
+* Stopping works only for streaming programs. Be aware, that the 
program might continue to run for
+* a while after sending the stop command, because after sources 
stopped to emit data ll operators
 
 Review comment:
   ll operators
   -> all operators


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread GitBox
carp84 commented on a change in pull request #8617: 
[FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#discussion_r290567794
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
 ##
 @@ -624,4 +624,58 @@ public void declineCheckpoint(final DeclineCheckpoint 
decline) {
return savepointFuture.thenCompose((path) ->
terminationFuture.thenApply((jobStatus -> path)));
}
+
+   @Override
+   public CompletableFuture stopWithCheckpoint(boolean 
advanceToEndOfEventTime) {
 
 Review comment:
   Please check my comments on `CheckpointCoordinator` about preventing 
duplicated codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread GitBox
carp84 commented on a change in pull request #8617: 
[FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#discussion_r290563567
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -188,6 +191,8 @@
 
PYMODULE_OPTION.setRequired(false);
PYMODULE_OPTION.setArgName("pyModule");
+
+   STOP_WITH_CHECKPOINT.setRequired(false);
 
 Review comment:
   It seems to be better if we could also support `CANCEL_WITH_CHECKPOINT` 
referring `CANCEL_WITH_SAVEPOINT_OPTION`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread GitBox
carp84 commented on a change in pull request #8617: 
[FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#discussion_r290567687
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -424,6 +424,27 @@ public boolean isShutdown() {
}
}
 
+   public CompletableFuture 
triggerSynchronousCheckpoint(
 
 Review comment:
   It seems to me that we could reuse the existing codes of 
`triggerSynchronousSavepoint`, adding a flag to indicate whether it's a 
checkpoint or savepoint. The same applies to 
`LegacyScheduler#stopWithCheckpoint`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290568555
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -114,6 +116,22 @@ private NetworkEnvironment(
this.isShutdown = false;
}
 
+   public static NetworkEnvironment fromConfiguration(
 
 Review comment:
   It might be better to put this static factory at the end of this class, also 
for the following `create` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290568168
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
 ##
 @@ -36,7 +37,7 @@
private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
 
/**
-* Verifies that {@link 
TaskManagerServicesConfiguration#fromConfiguration(Configuration, long, 
InetAddress, boolean)}
+* Verifies that {@link 
TaskManagerServicesConfiguration#fromConfiguration(Configuration, InetAddress)}
 
 Review comment:
   I think this test is only for verifying the generation of 
`NetworkEnvironmentConfiguration`. The scope of 
`TaskManagerServicesConfiguration#fromConfiguration` covers many things. Also 
this class name `TaskManagerServicesConfigurationTest` might also be renamed to 
`NetworkEnvironmentConfigurationTest` or something else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12662) show jobs failover in history server as well

2019-06-04 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-12662:


Assignee: vinoyang

> show jobs failover in history server as well
> 
>
> Key: FLINK-12662
> URL: https://issues.apache.org/jira/browse/FLINK-12662
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Su Ralph
>Assignee: vinoyang
>Priority: Major
>
> Currently 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/historyserver.html]
>  only show the completed jobs (completd, cancel, failed). Not showing any 
> intermediate failover. 
> Which make the cluster administrator/developer hard to find first place if 
> there is two failover happens. Feature ask is to 
> - make a failover as a record in history server as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12662) show jobs failover in history server as well

2019-06-04 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856289#comment-16856289
 ] 

vinoyang commented on FLINK-12662:
--

Thanks [~till.rohrmann] for approving this issue. and thanks [~ralphsu] for 
raising this valuable issue. I am going to start it.

> show jobs failover in history server as well
> 
>
> Key: FLINK-12662
> URL: https://issues.apache.org/jira/browse/FLINK-12662
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Su Ralph
>Priority: Major
>
> Currently 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/historyserver.html]
>  only show the completed jobs (completd, cancel, failed). Not showing any 
> intermediate failover. 
> Which make the cluster administrator/developer hard to find first place if 
> there is two failover happens. Feature ask is to 
> - make a failover as a record in history server as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290567675
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ##
 @@ -205,17 +205,20 @@ private void checkRootDirsClean(File[] rootDirs) {
private TaskManagerServicesConfiguration 
createTaskManagerServiceConfiguration(
Configuration config) throws IOException {
return TaskManagerServicesConfiguration.fromConfiguration(
-   config, MEM_SIZE_PARAM, InetAddress.getLocalHost(), 
true);
+   config, InetAddress.getLocalHost());
 
 Review comment:
   separate line for every parameter or we do not need break line here because 
it might not be long after removing above parameters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken

2019-06-04 Thread GitBox
lirui-apache commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is 
broken
URL: https://github.com/apache/flink/pull/8610#issuecomment-498921767
 
 
   I also prefer to keep HiveShim generic. Actually APIs in HiveShim like 
`getViews` and `getFunction` take a `IMetaStoreClient` as parameter. So in that 
sense it's generic in nature.
   @bowenli86 what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290567322
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -445,36 +442,14 @@ private static NettyConfig createNettyConfig(
final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
 
nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
-   getPageSize(configuration), 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   
ConfigurationParserUtils.getPageSize(configuration), 
ConfigurationParserUtils.getSlot(configuration), configuration);
 
 Review comment:
   separate line for all the parameters


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290567072
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ##
 @@ -78,33 +77,32 @@
private Optional systemResourceMetricsProbingInterval;
 
public TaskManagerServicesConfiguration(
-   InetAddress taskManagerAddress,
-   String[] tmpDirPaths,
-   String[] localRecoveryStateRootDirectories,
-   boolean localRecoveryEnabled,
-   NetworkEnvironmentConfiguration networkConfig,
-   @Nullable QueryableStateConfiguration 
queryableStateConfig,
-   int numberOfSlots,
-   long configuredMemory,
-   MemoryType memoryType,
-   boolean preAllocateMemory,
-   float memoryFraction,
-   long timerServiceShutdownTimeout,
-   RetryingRegistrationConfiguration 
retryingRegistrationConfiguration,
-   Optional systemResourceMetricsProbingInterval) {
+   InetAddress taskManagerAddress,
+   String[] tmpDirPaths,
+   String[] localRecoveryStateRootDirectories,
+   boolean localRecoveryEnabled,
+   @Nullable QueryableStateConfiguration queryableStateConfig,
+   int numberOfSlots,
+   long configuredMemory,
+   MemoryType memoryType,
+   boolean preAllocateMemory,
+   float memoryFraction,
+   int pageSize, long timerServiceShutdownTimeout,
 
 Review comment:
   separate line for parameters


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] Introduce ShuffleEnvironment interface

2019-06-04 Thread GitBox
zhijiangW commented on a change in pull request #8608: [FLINK-11392][network] 
Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r290566895
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ##
 @@ -78,33 +77,32 @@
private Optional systemResourceMetricsProbingInterval;
 
public TaskManagerServicesConfiguration(
-   InetAddress taskManagerAddress,
-   String[] tmpDirPaths,
-   String[] localRecoveryStateRootDirectories,
-   boolean localRecoveryEnabled,
-   NetworkEnvironmentConfiguration networkConfig,
-   @Nullable QueryableStateConfiguration 
queryableStateConfig,
-   int numberOfSlots,
-   long configuredMemory,
-   MemoryType memoryType,
-   boolean preAllocateMemory,
-   float memoryFraction,
-   long timerServiceShutdownTimeout,
-   RetryingRegistrationConfiguration 
retryingRegistrationConfiguration,
-   Optional systemResourceMetricsProbingInterval) {
+   InetAddress taskManagerAddress,
 
 Review comment:
   Keep the previous indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-04 Thread GitBox
Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle 
the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-498920811
 
 
   One way to my mind: set the scattered `clean()` method with the `TOP_LEVEL` 
to keep the old way, And set `checkSerializable` to false. All the user 
function will be passed to clean by `environment#clean()`, we can config the 
clean level there and checkSerializable at the entrance.
   
   ```
   @Internal
   public  F clean(F f) {
if (getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, getConfig().getClosureCleanLevel(), 
true);
}
ClosureCleaner.ensureSerializable(f);
return f;
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-04 Thread GitBox
Aitozi edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-498756627
 
 
   I know you mean, and I think it's hard to config the scattered `clean()` 
used in functions like `connector`, `Pattern`. Some functions can not obtained 
the `ExecutionConfig`. Do you have some thoughts @aljoscha . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8578: [FLINK-12685] 
[table-planner-blink] Supports UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578#discussion_r290563675
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
 ##
 @@ -119,7 +119,9 @@ object FlinkBatchRuleSets {
 new CoerceInputsRule(classOf[LogicalMinus], false),
 ConvertToNotInOrInRule.INSTANCE,
 // optimize limit 0
-FlinkLimit0RemoveRule.INSTANCE
+FlinkLimit0RemoveRule.INSTANCE,
+// unnest rule
+LogicalUnnestRule.INSTANCE
 
 Review comment:
   It seems that `LogicalUnnestRule` is also part of de-correlate. Why not put 
it in the `DECORRELATE` phase?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8578: [FLINK-12685] 
[table-planner-blink] Supports UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578#discussion_r290565276
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ##
 @@ -467,6 +467,9 @@ object FlinkTypeFactory {
   val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
   mapRelDataType.mapType
 
+// CURSOR for UDTF case, whose type info will never be used, just a 
placeholder
+case CURSOR => NothingType.INSTANCE
 
 Review comment:
   Just use a `GenericType(NothingTypeInfo)` is ok?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8578: [FLINK-12685] 
[table-planner-blink] Supports UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578#discussion_r290564818
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/UnnestITCase.scala
 ##
 @@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, 
TestData, TestingAppendSink, TestingRetractSink, TestingRetractTableSink}
+import org.apache.flink.types.Row
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
 Review comment:
   Can you add some cases to PlanCase and ITCase like:
   `select , A.field0, A.field1 from T, unnest(c) as A`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi removed a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-04 Thread GitBox
Aitozi removed a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-498759646
 
 
   I think it over again, it can be removed directly. And after this the clean 
method should not be directly used, it's should use with one entrance 
`environment#clean()` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8578: [FLINK-12685] 
[table-planner-blink] Supports UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578#discussion_r290564411
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRuleTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, 
FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+import java.sql.Timestamp
+
+/**
+  * Test for [[LogicalUnnestRule]].
+  */
+class LogicalUnnestRuleTest extends TableTestBase {
 
 Review comment:
   `UnnestTestBase` is not enough?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-06-04 Thread GitBox
JingsongLi commented on a change in pull request #8578: [FLINK-12685] 
[table-planner-blink] Supports UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578#discussion_r290563400
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkDecorrelateProgram.scala
 ##
 @@ -77,6 +81,15 @@ class FlinkDecorrelateProgram[OC <: FlinkOptimizeContext] 
extends FlinkOptimizeP
 join.getCondition.accept(visitor)
 super.visit(join)
   }
+
+  override def visit(other: RelNode): RelNode = {
+other match {
+  // ignore Uncollect's inputs due to the correlate variables are from 
UNNEST directly,
+  // not from cases which RelDecorrelator handles
+  case r: Uncollect => r
 
 Review comment:
   > not from cases which RelDecorrelator handles
   
   I don't get it.
   The real reason should be that `LogicalUnnestRule` is in the 
`DEFAULT_REWRITE` phase, and the `DEFAULT_REWRITE` phase is behind the 
`DECORRELATE` phase, so now we need not verify the Unnest correlation?
   Update comments?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12662) show jobs failover in history server as well

2019-06-04 Thread Su Ralph (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856278#comment-16856278
 ] 

Su Ralph commented on FLINK-12662:
--

Thanks [~till.rohrmann]. That's good as well, and probably better.

> show jobs failover in history server as well
> 
>
> Key: FLINK-12662
> URL: https://issues.apache.org/jira/browse/FLINK-12662
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Su Ralph
>Priority: Major
>
> Currently 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/historyserver.html]
>  only show the completed jobs (completd, cancel, failed). Not showing any 
> intermediate failover. 
> Which make the cluster administrator/developer hard to find first place if 
> there is two failover happens. Feature ask is to 
> - make a failover as a record in history server as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9465) Separate timeout for savepoint and checkpoint

2019-06-04 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856276#comment-16856276
 ] 

vinoyang commented on FLINK-9465:
-

[~till.rohrmann] thanks for approving this issue, I am going to start it.

> Separate timeout for savepoint and checkpoint
> -
>
> Key: FLINK-9465
> URL: https://issues.apache.org/jira/browse/FLINK-9465
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
>
> Savepoint can take much longer time to perform than checkpoint, especially 
> with incremental checkpoint enabled. This leads to a couple of troubles:
>  * For our job, we currently have to set the checkpoint timeout much large 
> than necessary, otherwise we would be unable to perform savepoint. 
>  * During rush hour, our cluster would encounter high rate of checkpoint 
> timeout due to backpressure, however we're unable to migrate to a larger 
> configuration, because savepoint also timeout.
> In my opinion, the timeout for savepoint should be configurable separately, 
> both in the config file and as parameter to the savepoint command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8550: [FLINK-12401][table] Support incremental emit under AccRetract mode for non-window streaming FlatAggregate on Table API

2019-06-04 Thread GitBox
sunjincheng121 commented on issue #8550: [FLINK-12401][table] Support 
incremental emit under AccRetract mode for non-window streaming FlatAggregate 
on Table API
URL: https://github.com/apache/flink/pull/8550#issuecomment-498913495
 
 
   Since the execution mode of the Stream operator has two modes, `ACC` and 
`ACCRetract`, users can achieve better performance by implementing special 
interfaces for streaming. The table below is a quick summary.
   
     | emitValue | emitUpdateWithRetract | emitUpdateWithoutRetract
   -- | -- | -- | --
   ACC | Y | N | Y
   ACCRetract | Y | Y | N
   
   -emitValue - for batch and streaming.
   -eimitUpdateWithRetract - only for streaming in ACC mode.
   -emitUpdateWithoutRetract - only for streaming in ACCRetract mode(need key 
definition on TableAggregateFunction, [under 
discussion](https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit#heading=h.evvcpnbn30wn)).
   
   So, In this PR, change the method name from `emitRetractValueIncrementally` 
to `emitUpdateWithRetract` is better. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-04 Thread GitBox
tzulitai commented on issue #8618:  [FLINK-12732][state-processor-api] Add 
savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618#issuecomment-498912234
 
 
   @flinkbot approve description
   @flinkbot approve consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state

2019-06-04 Thread GitBox
tzulitai commented on issue #8615:  [FLINK-12729][state-processor-api] Add 
state reader for consuming non-partitioned operator state
URL: https://github.com/apache/flink/pull/8615#issuecomment-498911292
 
 
   @flinkbot approve description
   @flinkbot approve consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12709) Implement RestartBackoffTimeStrategyFactoryLoader

2019-06-04 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856258#comment-16856258
 ] 

Zhu Zhu commented on FLINK-12709:
-

Thanks [~rmetzger].

Sorry for missing setting the related components. Will be careful in the future.

> Implement RestartBackoffTimeStrategyFactoryLoader
> -
>
> Key: FLINK-12709
> URL: https://issues.apache.org/jira/browse/FLINK-12709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.9.0
>
>
> We need to implement a RestartBackoffTimeStrategyFactoryLoader to instantiate 
> RestartBackoffTimeStrategyFactory.
> In order to be backwards compatible, the loader is responsible for converting 
> *RestartStrategy* 
> configurations([https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html])and
>  *RestartStrategyConfiguration* to latest *RestartBackoffTimeStrategy* 
> configurations.
> The converted configurations will be used to create 
> *RestartBackoffTimeStrategy.Factory* via 
> *RestartBackoffTimeStrategy#createFactory(Configuration)*.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carp84 commented on a change in pull request #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread GitBox
carp84 commented on a change in pull request #8617: 
[FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#discussion_r290559979
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -515,6 +513,12 @@ protected void stop(String[] args) throws Exception {
return;
}
 
+   if (stopOptions.hasCheckpointFlag()) {
+   LOG.info("Running 'stop-with-savepoint' command.");
 
 Review comment:
   here it should be "stop-with-checkpoint"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-06-04 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856241#comment-16856241
 ] 

BoWang commented on FLINK-12608:


OK, thanks [~rmetzger].

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-06-04 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Affects Version/s: 1.9.0

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12608) Add getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID) to SchedulingTopology

2019-06-04 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-12608:
---
Fix Version/s: 1.9.0

> Add 
> getVertex/ResultPartitionOrThrow(ExecutionVertexID/IntermediateResultPartitionID)
>  to SchedulingTopology
> ---
>
> Key: FLINK-12608
> URL: https://issues.apache.org/jira/browse/FLINK-12608
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in 
> [PR#8309|https://github.com/apache/flink/pull/8309#discussion_r287190944], 
> need to add getVertexOrThrow getResultPartitionOrThrow in SchedulingTopology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12733) Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread Congxian Qiu(klion26) (JIRA)
Congxian Qiu(klion26) created FLINK-12733:
-

 Summary: Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint
 Key: FLINK-12733
 URL: https://issues.apache.org/jira/browse/FLINK-12733
 Project: Flink
  Issue Type: Sub-task
Reporter: Congxian Qiu(klion26)
Assignee: Congxian Qiu(klion26)






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog

2019-06-04 Thread GitBox
xuefuz commented on issue #8616: [FLINK-12718][hive] allow users to specify 
hive-site.xml location to configure hive metastore client in HiveCatalog
URL: https://github.com/apache/flink/pull/8616#issuecomment-498877524
 
 
   Thanks for the contribution. I have a couple of high-level comments:
   
   1. The PR seems suggesting that hive-site.xml path is always required, which 
shouldn't be the case. We should just instantiate a HiveConf instance and get 
the config dir from the environment.
   
   2. There seems to be a good number of constructors. We can probably reduce 
it by only providing Catalog(String catName, String dbName, String/URL 
hiveSiteLocation) and taking care of nulls. We can add more on demand later on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set

2019-06-04 Thread Cesar Alvernaz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856145#comment-16856145
 ] 

Cesar Alvernaz edited comment on FLINK-12383 at 6/4/19 9:56 PM:


I could reproduce this issue following the instruction from 
[here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html].
  Since the `log.file` property is configured from a system property this 
warning can be fixed by setting the environment to add job manager-specific JVM 
options

{{
env:

 - name: FLINK_ENV_JAVA_OPTS
   value: -Dlog.file=/tmp/log
}} 


was (Author: calvernaz):
I could reproduce this issue following the instruction from 
[here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html].
  Since the `log.file` property is configured from a system property this 
warning can be fixed by setting the environment to add job manager-specific JVM 
options

 

```

env:

 - name: FLINK_ENV_JAVA_OPTS
   value: -Dlog.file=/tmp/log

``` 

> "Log file environment variable 'log.file' is not set" despite web.log.path 
> being set
> 
>
> Key: FLINK-12383
> URL: https://issues.apache.org/jira/browse/FLINK-12383
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.8.0
>Reporter: Henrik
>Priority: Minor
>
> You get these warnings when starting a session cluster, despite having 
> configured all things logs as specified by the configuration reference on the 
> [web 
> site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]:
> {code:java}
> [job] 2019-05-01 13:25:35,418 WARN  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Log file 
> environment variable 'log.file' is not set.
> [job] 2019-05-01 13:25:35,419 INFO  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
> location of main cluster component log file: /var/lib/log/flink/jobmanager.log
> [job] 2019-05-01 13:25:35,419 INFO  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
> location of main cluster component stdout file: 
> /var/lib/log/flink/jobmanager.out
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set

2019-06-04 Thread Cesar Alvernaz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856145#comment-16856145
 ] 

Cesar Alvernaz edited comment on FLINK-12383 at 6/4/19 9:56 PM:


{quote}I could reproduce this issue following the instruction from 
[here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html].
  Since the `log.file` property is configured from a system property this 
warning can be fixed by setting the environment to add job manager-specific JVM 
options

{{env:

 - name: FLINK_ENV_JAVA_OPTS
   value: -Dlog.file=/tmp/log}}


was (Author: calvernaz):
I could reproduce this issue following the instruction from 
[here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html].
  Since the `log.file` property is configured from a system property this 
warning can be fixed by setting the environment to add job manager-specific JVM 
options

{{
env:

 - name: FLINK_ENV_JAVA_OPTS
   value: -Dlog.file=/tmp/log
}} 

> "Log file environment variable 'log.file' is not set" despite web.log.path 
> being set
> 
>
> Key: FLINK-12383
> URL: https://issues.apache.org/jira/browse/FLINK-12383
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.8.0
>Reporter: Henrik
>Priority: Minor
>
> You get these warnings when starting a session cluster, despite having 
> configured all things logs as specified by the configuration reference on the 
> [web 
> site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]:
> {code:java}
> [job] 2019-05-01 13:25:35,418 WARN  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Log file 
> environment variable 'log.file' is not set.
> [job] 2019-05-01 13:25:35,419 INFO  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
> location of main cluster component log file: /var/lib/log/flink/jobmanager.log
> [job] 2019-05-01 13:25:35,419 INFO  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
> location of main cluster component stdout file: 
> /var/lib/log/flink/jobmanager.out
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set

2019-06-04 Thread Cesar Alvernaz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856145#comment-16856145
 ] 

Cesar Alvernaz commented on FLINK-12383:


I could reproduce this issue following the instruction from 
[here|https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html].
  Since the `log.file` property is configured from a system property this 
warning can be fixed by setting the environment to add job manager-specific JVM 
options

 

```

env:

 - name: FLINK_ENV_JAVA_OPTS
   value: -Dlog.file=/tmp/log

``` 

> "Log file environment variable 'log.file' is not set" despite web.log.path 
> being set
> 
>
> Key: FLINK-12383
> URL: https://issues.apache.org/jira/browse/FLINK-12383
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.8.0
>Reporter: Henrik
>Priority: Minor
>
> You get these warnings when starting a session cluster, despite having 
> configured all things logs as specified by the configuration reference on the 
> [web 
> site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]:
> {code:java}
> [job] 2019-05-01 13:25:35,418 WARN  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Log file 
> environment variable 'log.file' is not set.
> [job] 2019-05-01 13:25:35,419 INFO  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
> location of main cluster component log file: /var/lib/log/flink/jobmanager.log
> [job] 2019-05-01 13:25:35,419 INFO  
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
> location of main cluster component stdout file: 
> /var/lib/log/flink/jobmanager.out
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8589: [FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for HiveCatalog

2019-06-04 Thread GitBox
bowenli86 commented on issue #8589: [FLINK-12677][hive][sql-client] Add 
descriptor, validator, and factory for HiveCatalog
URL: https://github.com/apache/flink/pull/8589#issuecomment-498849277
 
 
   Rebased this PR to https://github.com/apache/flink/pull/8616


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8589: [FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for HiveCatalog

2019-06-04 Thread GitBox
bowenli86 commented on a change in pull request #8589: 
[FLINK-12677][hive][sql-client] Add descriptor, validator, and factory for 
HiveCatalog
URL: https://github.com/apache/flink/pull/8589#discussion_r289981359
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.descriptors;
+
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.descriptors.CatalogDescriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_PROPERTOES_HIVE_METASTORE_URIS;
+import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE;
+
+/**
+ * Catalog descriptor for {@link HiveCatalog}.
+ */
+public class HiveCatalogDescriptor extends CatalogDescriptor {
+
+   private String hiveMetastoreUris;
+
+   // TODO : set default database
+   public HiveCatalogDescriptor(String hiveMetastoreUris) {
+   super(CATALOG_TYPE_VALUE_HIVE, 1);
+
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreUris));
+   this.hiveMetastoreUris = hiveMetastoreUris;
 
 Review comment:
   I'm not sure about using environment vars because it means the yaml config 
file is not the single source of truth anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog

2019-06-04 Thread GitBox
bowenli86 commented on issue #8616: [FLINK-12718][hive] allow users to specify 
hive-site.xml location to configure hive metastore client in HiveCatalog
URL: https://github.com/apache/flink/pull/8616#issuecomment-498847563
 
 
   cc @xuefuz @lirui-apache @zjuwangg


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken

2019-06-04 Thread GitBox
xuefuz commented on issue #8610: [FLINK-12715][hive] Hive-1.2.1 build is broken
URL: https://github.com/apache/flink/pull/8610#issuecomment-498782835
 
 
   > I wonder that, since`moveToTrash()` is mainly a util, should we move it to 
a separate util shim class like `HiveUtilShim`? The current `HiveShim` class, 
as its javadoc says, is mainly for Hive Metastore. We probably should rename it 
to `HiveClientShim`?
   > 
   > LGTM otherwise.
   
   HiveShim is quite generic in naming. It's a shim, so in theory anything can 
be put there. Maybe we just need to update the javadoc to reflect the new 
addition.
   
   Having another shim for utils might be an overkill for now. However, we can 
keep an eye on this and see how we organize better when this layer gets too 
heavy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-04 Thread GitBox
sjwiesman commented on issue #8618:  [FLINK-12732][state-processor-api] Add 
savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618#issuecomment-498778322
 
 
   @flinkbot attention @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-04 Thread GitBox
flinkbot commented on issue #8618:  [FLINK-12732][state-processor-api] Add 
savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618#issuecomment-498777998
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12732) Add savepoint reader for consuming partitioned operator state

2019-06-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12732:
---
Labels: pull-request-available  (was: )

> Add savepoint reader for consuming partitioned operator state
> -
>
> Key: FLINK-12732
> URL: https://issues.apache.org/jira/browse/FLINK-12732
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Runtime / State Backends
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 removed a comment on issue #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog

2019-06-04 Thread GitBox
bowenli86 removed a comment on issue #8616: [FLINK-12718][hive] allow users to 
specify hive-site.xml location to configure hive metastore client in HiveCatalog
URL: https://github.com/apache/flink/pull/8616#issuecomment-498773580
 
 
   cc @xuefuz @lirui-apache @zjuwangg


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman opened a new pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-04 Thread GitBox
sjwiesman opened a new pull request #8618:  [FLINK-12732][state-processor-api] 
Add savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618
 
 
   ## What is the purpose of the change
   
   This is the second PR for FLIP-43 adding the functionality to read 
partitioned operator state from a state snapshot. It is based on #8615 and only 
the final 4 commits should be reviewed.
   
   ## Brief change log
   
   62dd050 - Some basic refactoring, this pulls the `NeverCompleteFuture` into 
a reusable utility
   
   38eaa66 - Implements a shim `Environment` on-top of the available user 
`RuntimeContext`
   
   9689001 - Adds the new functionality to read keyed state out of a snapshot
   
   be0552c - Adds new documentation
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   UT and IT tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-06-04 Thread GitBox
asfgit closed pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat 
to read Hive tables
URL: https://github.com/apache/flink/pull/8522
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12572) Implement HiveInputFormat to read Hive tables

2019-06-04 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12572.

Resolution: Fixed

merged in 1.9.0: 38557bf8a6f8bebef8733f3f4f3b3950e9678fca

> Implement HiveInputFormat to read Hive tables
> -
>
> Key: FLINK-12572
> URL: https://issues.apache.org/jira/browse/FLINK-12572
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / Ecosystem
>Affects Versions: 1.9.0
>Reporter: zjuwangg
>Assignee: zjuwangg
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   >