[GitHub] [flink] zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add 
Hive-1.2.1 build to Travis
URL: https://github.com/apache/flink/pull/8911#discussion_r298457662
 
 

 ##
 File path: tools/travis/stage.sh
 ##
 @@ -212,6 +219,9 @@ function get_test_modules_for_stage() {
 (${STAGE_TESTS})
 echo "-pl $modules_tests"
 ;;
+(${STAGE_CONNECTOR_HIVE_1})
+echo "-pl $MODULES_CONNECTOR_HIVE -Phive-1.2.1"
 
 Review comment:
   on a side-note, if you would include clean here you'd only recompile the 
hive module, but it's quite a hack.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506606238
 
 
   To integrate it into the hadoop profile, modify the profile of these 
[builds](https://github.com/apache/flink/blob/master/.travis.yml#L119) to 
include the hive version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10701) Move modern kafka connector module into connector profile

2019-06-27 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874685#comment-16874685
 ] 

Chesnay Schepler commented on FLINK-10701:
--

the long-term goal is still to have kafka contained to a single profile, which 
still isn't the case.

> Move modern kafka connector module into connector profile 
> --
>
> Key: FLINK-10701
> URL: https://issues.apache.org/jira/browse/FLINK-10701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The modern connector is run in the {{misc}} profile since it wasn't properly 
> added to the {{connector profile in stage.sh click 
> [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to 
> see more details.}}
> *This issue is blocked by FLINK-10603.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI

2019-06-27 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874684#comment-16874684
 ] 

Dian Fu commented on FLINK-13011:
-

[~sunjincheng121] [~Zentol] I have contacted the owner of 
[https://pypi.org/project/pyflink/] and have got the ownership of this project. 

> Release the PyFlink into PyPI
> -
>
> Key: FLINK-13011
> URL: https://issues.apache.org/jira/browse/FLINK-13011
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
> have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
> PyFlinjk distribution package built by FLINK-12962 to PyPI. 
> https://pypi.org/
> https://packaging.python.org/tutorials/packaging-projects/
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298456137
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
 
 Review comment:
   I will support state ttl in `TemporalRowTimeJoinOperator` too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298454608
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
 ##
 @@ -117,7 +117,9 @@ public void open() throws Exception {
@Override
public void close() throws Exception {
super.close();
-   joinCondition.backingJoinCondition.close();
+   if (joinCondition != null) {
 
 Review comment:
   I think we should protect it in each close() method. The compiled instance 
may not be instantiated when close() is invoked (an exception happens before 
instantiation). 
   
   This change is not related to this pull request. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298454129
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+
+  def containsTemporalJoinCondition(condition: RexNode): Boolean = {
 
 Review comment:
   Emmm. I think a separate method to check condition is more clean here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298453939
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, 
isRowtimeIndicatorType}
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.generated.GeneratedJoinCondition
+import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, 
RelExplainUtil, TemporalJoinUtil}
+import 
org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, 
TemporalRowTimeJoinOperator}
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.util.Preconditions.checkState
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+class StreamExecTemporalTableJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftRel: RelNode,
+rightRel: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = {
+val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
+
+var rowtimeJoin: Boolean = false
+val visitor = new RexVisitorImpl[Unit](true) {
+  override def visitCall(call: RexCall): Unit = {
+if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
+  rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
+} else {
+  call.getOperands.foreach(node => node.accept(this))
+}
+  }
+}
+nonEquiJoinRex.accept(visitor)
+rowtimeJoin
+  }
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+new StreamExecTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  conditionExpr,
+  joinType)
+  }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override def replaceInputNode(
+ordinalInParent: Int,
+newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+  override protected def translateToPlanInternal(
 
 Review comment:
   I 

[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298453583
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
 ##
 @@ -48,8 +48,18 @@ class TableImpl(val tableEnv: TableEnvironment, 
operationTree: QueryOperation) e
   override def select(fields: Expression*): Table = ???
 
   override def createTemporalTableFunction(
 
 Review comment:
   The another way (using Expression parameter) will be supported when we 
integrate table-common's `TableImpl` and will add some tests after that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298453442
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -488,14 +488,14 @@ abstract class StreamTableEnvironment(
   }
 }
 
-fields.zipWithIndex.foreach {
-  case ("rowtime", idx) =>
-extractRowtime(idx, "rowtime", None)
-
-  case ("proctime", idx) =>
-extractProctime(idx, "proctime")
-
-  case (name, _) => fieldNames = name :: fieldNames
+fields.zipWithIndex.foreach { case (name, idx) =>
 
 Review comment:
   Yes. We hacked this to make the test util to support "o_rowtime" as a 
rowtime attribute. Currently, we can't invoke rowtime/proctime function in 
blink-planner. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874667#comment-16874667
 ] 

Yun Gao edited comment on FLINK-12852 at 6/28/19 4:45 AM:
--

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same for all the local 
buffer pools: It may cause previous running jobs unable to run due to the total 
buffers is less than the sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.


was (Author: gaoyunhaii):
For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each 

[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874667#comment-16874667
 ] 

Yun Gao commented on FLINK-12852:
-

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, __ I think __ currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874667#comment-16874667
 ] 

Yun Gao edited comment on FLINK-12852 at 6/28/19 4:38 AM:
--

For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, I think currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.


was (Author: gaoyunhaii):
For all the possible methods to solve this problem come to me: 
 # Reserver buffers for exclusive buffers: It is hard to know how much required 
to reserve for the exclusive buffers, especially it is hard to know how many 
tasks will be scheduled to this TM.
 # Make the required buffers and the max buffers the same: It may cause 
previous running jobs unable to run due to the total buffers is less than the 
sum of the updated required buffers.
 # Postpone the acquirement of exclusive buffers: It does not solves the 
deadlock problem, since the downstream still may not acquired any buffers to 
make progress.
 # Add a timeout for the _requestMemorySegments._

Therefore, __ I think __ currently we need to use the last method. I'd like to 
 # Add an option for the timeout of requestMemorySegment for each channel. The 
default timeout is 30s. This option will be marked as undocumented since it may 
be removed within the future implementation.
 # Transfer the timeout to NetworkBufferPool.
 # RequestMemorySegments will throw IOException("Insufficient buffer")  if not 
all segments acquired after timeout.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in 

[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506595119
 
 
   > sorry, should have asked @zentol to help review.
   > 
   > @lirui-apache seems like I might misunderstood how the stage is run? 
Shouldn't the profile only recompile and retest flink-connector-hive module?
   
   The approach in this PR will recompile flink-connector-hive and all the 
depended modules. To only recompile flink-connector-hive, we can remove the 
`-am` option. My understanding is if we remove `-am`, the depended modules will 
be unavailable (or downloaded somewhere which doesn't reflect changes in a PR) 
because local maven repo is cleared for each Travis job. @zentol please correct 
me if I misunderstand.
   
   Another alternative is to completely skip the recompile and only run tests 
against Hive-1.2.1. So that we can get rid of most of the overhead while we're 
still able to catch the issues with Hive-1.2.1. Drawbacks of this approach is 
that errors that happen during tests are probably harder to understand than 
errors that happen during compiling. For example, we'll get 
`NoClassDefFoundError` instead of `cannot find symbol`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator

2019-06-27 Thread GitBox
JingsongLi commented on issue #8821:  [FLINK-12922][Table SQL / Planner] Remove 
method parameter from OperatorCodeGenerator
URL: https://github.com/apache/flink/pull/8821#issuecomment-506592750
 
 
   LGTM +1, @liyafan82 can you squash these commits to a one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298446897
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -73,7 +73,7 @@ public void before() {
 
@Test
public void testCreateTable() {
-   check("CREATE TABLE tbl1 (\n" +
+   check("CREATE TABLE db.tbl1 (\n" +
 
 Review comment:
   A mistake ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro

2019-06-27 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-13009:
--
Affects Version/s: 1.8.0

> YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
>  throws NPE on Travis
> -
>
> Key: FLINK-13009
> URL: https://issues.apache.org/jira/browse/FLINK-13009
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> The test 
> {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
>  throws NPE on Travis.
> NPE throws from RMAppAttemptMetrics.java#128, and the following is the code 
> from hadoop-2.8.3[1]
> {code:java}
> // Only add in the running containers if this is the active attempt.
> 128   RMAppAttempt currentAttempt = rmContext.getRMApps()
> 129   .get(attemptId.getApplicationId()).getCurrentAppAttempt();
> {code}
>  
> log [https://api.travis-ci.org/v3/job/550689578/log.txt]
> [1] 
> [https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-27 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12852:

Description: 
When running tests with an upstream vertex and downstream vertex, deadlock 
occurs when submitting the job:
{code:java}
"Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
nid=0x38845 waiting on condition [0x7f2cbe9fe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00073ed6b6f0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
- locked <0x00073fbc81f0> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
at 
org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
at java.lang.Thread.run(Thread.java:834)
{code}
This is due to the required and max of local buffer pool is not the same and 
there may be over-allocation, when assignExclusiveSegments there are no 
available memory.

 

The detail of the scenarios is as follows: The parallelism of both upstream 
vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 9 
upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces data 
quickly and each occupy about 990 buffers. Then the DownStream task starts and 
try to assigning exclusive buffers for 1500 -9 = 1491 InputChannels. It 
requires 2981 buffers but only 1786 left. Since not all downstream tasks can 
start, the job will be blocked finally and no buffer can be released, and the 
deadlock finally occurred.

 

I think although increasing the network memory solves the problem, the deadlock 
may not be acceptable.  Fined grained resource management  Flink-12761 can 
solve this problem, but AFAIK in 1.9 it will not include the network memory 
into the ResourceProfile.

  was:
When running tests with an upstream vertex and downstream vertex, deadlock 
occurs when submitting the job:
{code:java}
"Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
nid=0x38845 waiting on condition [0x7f2cbe9fe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00073ed6b6f0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
- locked <0x00073fbc81f0> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
at 
org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
at java.lang.Thread.run(Thread.java:834)
{code}
This is due to the required and max of local buffer pool is not the same and 
there may be over-allocation, when assignExclusiveSegments there are no 
available memory.

 

The detail of the scenarios is as follows: The parallelism of both upstream 
vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 9 
upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
local 

[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298435255
 
 

 ##
 File path: docs/dev/table/common.zh.md
 ##
 @@ -89,6 +89,35 @@ tapiResult.insertInto("outputTable")
 // execute
 env.execute()
 
+{% endhighlight %}
+
+
+
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of 
StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...)   # or
+table_env.register_table_source("table2", ...) # or
 
 Review comment:
   delete the "# or" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298435312
 
 

 ##
 File path: docs/dev/table/common.zh.md
 ##
 @@ -392,6 +486,26 @@ val revenue = orders
 
 **Note:** The Scala Table API uses Scala Symbols, which start with a single 
tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala 
implicits. Make sure to import `org.apache.flink.api.scala._` and 
`org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
 
+
+
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+.filter("cCountry === 'FRANCE'")
 
 Review comment:
   Whether add \ after the method filter and groupby ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298440322
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -316,7 +316,7 @@ value NOT IN (sub-query)
 
 
 
-
+
 
 Review comment:
   In line 432 "STRING.similar(STRING)" is previous wrong.Can you change it 
along the way both in this file and corresponding zh.md file?Thank you.
   STRING.similar(STRING) -> STRING1.similar(STRING2)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298430168
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -89,6 +89,35 @@ tapiResult.insertInto("outputTable")
 // execute
 env.execute()
 
+{% endhighlight %}
+
+
+
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of 
StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...)   # or
+table_env.register_table_source("table2", ...) # or
 
 Review comment:
   delete the "# or" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298438156
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -170,6 +181,50 @@ tableEnvironment
 {% endhighlight %}
 
 
+
+{% highlight python %}
+table_environment \
+.connect(  # declare the external system to connect to
+ Kafka()
 
 Review comment:
   The codestyle doesn't satisfy E126 continuation line over-indented for 
hanging indent and some similar problem in other code. Is there a better 
codestyle?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298441273
 
 

 ##
 File path: docs/dev/table/streaming/query_configuration.md
 ##
 @@ -135,6 +160,16 @@ val qConfig: StreamQueryConfig = ???
 // set idle state retention time: min = 12 hours, max = 24 hours
 qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
+{% endhighlight %}
+
+
+{% highlight python %}
+
+q_config = ...
 
 Review comment:
   Add a comment description that the type of variable q_config is 
"StreamQueryConfig"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
HuangXingBo commented on a change in pull request #8916: 
[FLINK-12897][python][docs] Improve the Python Table API docs by adding more 
examples
URL: https://github.com/apache/flink/pull/8916#discussion_r298435022
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -392,6 +486,26 @@ val revenue = orders
 
 **Note:** The Scala Table API uses Scala Symbols, which start with a single 
tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala 
implicits. Make sure to import `org.apache.flink.api.scala._` and 
`org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
 
+
+
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+.filter("cCountry === 'FRANCE'")
 
 Review comment:
   Whether add \ after the method filter and groupby ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298442961
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
 
 Review comment:
   `TemporalRowTimeJoinOperator` logical is a bit complicated, consider add a 
`TemporalRowTimeJoinOperatorTest`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298440940
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
 ##
 @@ -117,7 +117,9 @@ public void open() throws Exception {
@Override
public void close() throws Exception {
super.close();
-   joinCondition.backingJoinCondition.close();
+   if (joinCondition != null) {
 
 Review comment:
   Any change to not init `joinCondition`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298440480
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/TemporalUdtfJoinTest.scala
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.stream.sql.join
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
+import org.hamcrest.Matchers.containsString
+import org.junit.Test
+
+class TemporalUdtfJoinTest extends TableTestBase {
 
 Review comment:
   About udtf, There is no user define, maybe use table function instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298442473
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
 
 Review comment:
   It seems that `TemporalRowTimeJoinOperator` does not deal with 
`RetentionTime`, maybe you should add a TODO or implement retention mechanism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298441480
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, 
isRowtimeIndicatorType}
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.generated.GeneratedJoinCondition
+import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, 
RelExplainUtil, TemporalJoinUtil}
+import 
org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, 
TemporalRowTimeJoinOperator}
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.util.Preconditions.checkState
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+class StreamExecTemporalTableJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftRel: RelNode,
+rightRel: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = {
+val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
+
+var rowtimeJoin: Boolean = false
+val visitor = new RexVisitorImpl[Unit](true) {
+  override def visitCall(call: RexCall): Unit = {
+if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
+  rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
+} else {
+  call.getOperands.foreach(node => node.accept(this))
+}
+  }
+}
+nonEquiJoinRex.accept(visitor)
+rowtimeJoin
+  }
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+new StreamExecTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  conditionExpr,
+  joinType)
+  }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override def replaceInputNode(
+ordinalInParent: Int,
+newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+  override protected def translateToPlanInternal(
+tableEnv: 

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298442640
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/TemporalRowTimeJoinOperator.java
 ##
 @@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This operator works by keeping on the state collection of probe and build 
records to process
+ * on next watermark. The idea is that between watermarks we are collecting 
those elements
+ * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+ * state.
+ *
+ * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * however we always keep at least one record - the latest one - even if it's 
past the last
+ * watermark.
+ *
+ * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
+   extends AbstractStreamOperator
+   implements TwoInputStreamOperator, 
Triggerable {
+
+   private static final long serialVersionUID = 6642514795175288193L;
+
+   private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
+   private static final String LEFT_STATE_NAME = "left";
+   private static final String RIGHT_STATE_NAME = "right";
+   private static final String REGISTERED_TIMER_STATE_NAME = "timer";
+   private static final String TIMERS_STATE_NAME = "timers";
+
+   private final BaseRowTypeInfo leftType;
+   private final BaseRowTypeInfo rightType;
+   private final 

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298441284
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
 ##
 @@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.temporal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * An abstract {@link TwoInputStreamOperator} that allows its subclasses to 
clean
+ * up their state based on a TTL. This TTL should be specified in the provided
+ * {@code minRetentionTime} and {@code maxRetentionTime}.
+ *
+ * For each known key, this operator registers a timer (in processing time) 
to
+ * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+ * state to cleanup and what further action to take.
+ *
+ * This class takes care of maintaining at most one timer per key.
+ *
+ * IMPORTANT NOTE TO USERS: When extending this class, do not use 
processing time
+ * timers in your business logic. The reason is that:
+ *
+ * 1) if your timers collide with clean up timers and you delete them, then 
state
+ * clean-up will not be performed, and
+ *
+ * 2) (this one is the reason why this class does not allow to override the 
onProcessingTime())
 
 Review comment:
   Why not let `onProcessingTime` final?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
bowenli86 commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506587670
 
 
   sorry, should have asked @zentol to help review.
   
   @lirui-apache seems like I might misunderstood how the stage is run? 
Shouldn't the profile only recompile and retest flink-connector-hive module?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10701) Move modern kafka connector module into connector profile

2019-06-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874616#comment-16874616
 ] 

vinoyang edited comment on FLINK-10701 at 6/28/19 3:18 AM:
---

Hi [~Zentol] I have seen a separated test profile named "kafka/gelly" in 
Travis. It seems this issue is meaningless, right?


was (Author: yanghua):
Hi [~Zentol] I have seen a separated test profile in Travis. It seems this 
issue is meaningless, right?

> Move modern kafka connector module into connector profile 
> --
>
> Key: FLINK-10701
> URL: https://issues.apache.org/jira/browse/FLINK-10701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The modern connector is run in the {{misc}} profile since it wasn't properly 
> added to the {{connector profile in stage.sh click 
> [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to 
> see more details.}}
> *This issue is blocked by FLINK-10603.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10701) Move modern kafka connector module into connector profile

2019-06-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874616#comment-16874616
 ] 

vinoyang commented on FLINK-10701:
--

Hi [~Zentol] I have seen a separated test profile in Travis. It seems this 
issue is meaningless, right?

> Move modern kafka connector module into connector profile 
> --
>
> Key: FLINK-10701
> URL: https://issues.apache.org/jira/browse/FLINK-10701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The modern connector is run in the {{misc}} profile since it wasn't properly 
> added to the {{connector profile in stage.sh click 
> [here|https://github.com/apache/flink/pull/6890#issuecomment-431917344] to 
> see more details.}}
> *This issue is blocked by FLINK-10603.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] 
Support Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   And I strongly suggest you change class name of them, because them really 
make me confused...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] 
Support Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   And I strongly suggest you change class name of them, because them really 
confused...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi edited a comment on issue #8901: [FLINK-13003][table-planner-blink] 
Support Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?
   And I strongly suggest you change to class name of them, because them really 
confused...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on issue #8901: [FLINK-13003][table-planner-blink] Support 
Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901#issuecomment-506583512
 
 
   Can you describe the rule process of LookupJoin and TemporalTableJoin 
separately?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13025) Elasticsearch 7.x support

2019-06-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874614#comment-16874614
 ] 

vinoyang commented on FLINK-13025:
--

Thank you for opening this issue [~Keegan-CloudImperium], I was going to open 
it and try to implement it.
Hi [~aljoscha], Have we ever thought about providing an Elasticsearch universal 
connector like the Kafka universal connector? Is this idea practical?

> Elasticsearch 7.x support
> -
>
> Key: FLINK-13025
> URL: https://issues.apache.org/jira/browse/FLINK-13025
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.8.0
>Reporter: Keegan Standifer
>Priority: Major
>
> Elasticsearch 7.0.0 was released in April of 2019: 
> [https://www.elastic.co/blog/elasticsearch-7-0-0-released]
> The latest elasticsearch connector is 
> [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298430341
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalUdtfJoinITCase.scala
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class TemporalTableFunctionJoinITCase(state: StateBackendMode)
 
 Review comment:
   Why not just `TemporalJoinITCase`? align with flink-planner?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298435721
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical._
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+
+import java.util
+
+class StreamExecTemporalUdtfJoinRule
 
 Review comment:
   Why not use `StreamExecTemporalTableJoinRule`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298429826
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -73,7 +73,7 @@ public void before() {
 
@Test
public void testCreateTable() {
-   check("CREATE TABLE tbl1 (\n" +
+   check("CREATE TABLE db.tbl1 (\n" +
 
 Review comment:
   Why?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298431205
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -488,14 +488,14 @@ abstract class StreamTableEnvironment(
   }
 }
 
-fields.zipWithIndex.foreach {
-  case ("rowtime", idx) =>
-extractRowtime(idx, "rowtime", None)
-
-  case ("proctime", idx) =>
-extractProctime(idx, "proctime")
-
-  case (name, _) => fieldNames = name :: fieldNames
+fields.zipWithIndex.foreach { case (name, idx) =>
 
 Review comment:
   Why we change this? It seems that flink-planner has not this logical? 
Flink-planner need invoke rowtime/proctime function to define a time attribute?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298436775
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+
+  def containsTemporalJoinCondition(condition: RexNode): Boolean = {
 
 Review comment:
   NIT: use anonymous class?
   ```
 def containsTemporalJoinCondition(condition: RexNode): Boolean = {
   var hasTemporalJoinCondition: Boolean = false
   condition.accept(new RexVisitorImpl[Void](true) {
 override def visitCall(call: RexCall): Void = {
   if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
 super.visitCall(call)
   } else {
 hasTemporalJoinCondition = true
 null
   }
 }
   })
   hasTemporalJoinCondition
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298430439
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalUdtfJoinITCase.scala
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class TemporalTableFunctionJoinITCase(state: StateBackendMode)
+  extends StreamingWithStateTestBase(state) {
+
+  /**
+* Because of nature of the processing time, we can not (or at least it is 
not that easy)
+* validate the result here. Instead of that, here we are just testing 
whether there are no
+* exceptions in a full blown ITCase. Actual correctness is tested in unit 
tests.
+*/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+val sqlQuery =
+  """
+|SELECT
+|  o.amount * r.rate AS amount
+|FROM
+|  Orders AS o,
+|  LATERAL TABLE (Rates(o.proctime)) AS r
+|WHERE r.currency = o.currency
+|""".stripMargin
+
+val ordersData = new mutable.MutableList[(Long, String)]
+ordersData.+=((2L, "Euro"))
+ordersData.+=((1L, "US Dollar"))
+ordersData.+=((50L, "Yen"))
+ordersData.+=((3L, "Euro"))
+ordersData.+=((5L, "US Dollar"))
+
+val ratesHistoryData = new mutable.MutableList[(String, Long)]
+ratesHistoryData.+=(("US Dollar", 102L))
+ratesHistoryData.+=(("Euro", 114L))
+ratesHistoryData.+=(("Yen", 1L))
+ratesHistoryData.+=(("Euro", 116L))
+ratesHistoryData.+=(("Euro", 119L))
+
+val orders = env
+  .fromCollection(ordersData)
+  .toTable(tEnv, 'amount, 'currency, 'proctime)
+val ratesHistory = env
+  .fromCollection(ratesHistoryData)
+  .toTable(tEnv, 'currency, 'rate, 'proctime)
+
+tEnv.registerTable("Orders", orders)
+tEnv.registerTable("RatesHistory", ratesHistory)
+tEnv.registerFunction(
+  "Rates",
+  ratesHistory.createTemporalTableFunction("proctime", "currency"))
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new TestingAppendSink)
+env.execute()
+  }
+
+  @Test
+  def testEventTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val sqlQuery =
+  """
+|SELECT
+|  o.amount * r.rate AS amount
+|FROM
+|  Orders AS o,
+|  LATERAL TABLE (Rates(o.rowtime)) AS r
+|WHERE r.currency = o.currency
+|""".stripMargin
+
 
 Review comment:
   remove empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298431452
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
 ##
 @@ -48,8 +48,18 @@ class TableImpl(val tableEnv: TableEnvironment, 
operationTree: QueryOperation) e
   override def select(fields: Expression*): Table = ???
 
   override def createTemporalTableFunction(
 
 Review comment:
   It seems that there is an only way to use temporal join by using Table class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298432231
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalTableJoin.scala
 ##
 @@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, 
isRowtimeIndicatorType}
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.generated.GeneratedJoinCondition
+import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.util.{InputRefVisitor, KeySelectorUtil, 
RelExplainUtil, TemporalJoinUtil}
+import 
org.apache.flink.table.runtime.join.temporal.{TemporalProcessTimeJoinOperator, 
TemporalRowTimeJoinOperator}
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.util.Preconditions.checkState
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+class StreamExecTemporalTableJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftRel: RelNode,
+rightRel: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = {
+val nonEquiJoinRex = getJoinInfo.getRemaining(cluster.getRexBuilder)
+
+var rowtimeJoin: Boolean = false
+val visitor = new RexVisitorImpl[Unit](true) {
+  override def visitCall(call: RexCall): Unit = {
+if (call.getOperator == TEMPORAL_JOIN_CONDITION) {
+  rowtimeJoin = TemporalJoinUtil.isRowtimeCall(call)
+} else {
+  call.getOperands.foreach(node => node.accept(this))
+}
+  }
+}
+nonEquiJoinRex.accept(visitor)
+rowtimeJoin
+  }
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+new StreamExecTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  conditionExpr,
+  joinType)
+  }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override def replaceInputNode(
+ordinalInParent: Int,
+newInputNode: ExecNode[StreamTableEnvironment, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+  override protected def translateToPlanInternal(
 
 Review comment:
   

[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298435221
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298436393
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/TemporalJoinUtil.scala
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Utilities for temporal table join
+  */
+object TemporalJoinUtil {
+
+  // 

+  //  Temporal TableFunction Join Utilities
+  // 

+
+  /**
+* [[TEMPORAL_JOIN_CONDITION]] is a specific condition which correctly 
defines
+* references to rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute.
+* The condition is used to mark this is a temporal tablefunction join.
+* Later rightTimeAttribute, rightPrimaryKeyExpression and 
leftTimeAttribute will be
+* extracted from the condition.
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+
+  def isRowtimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+rexBuilder: RexBuilder,
+leftTimeAttribute: RexNode,
+rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-27 Thread GitBox
JingsongLi commented on a change in pull request #8901: 
[FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink 
planner
URL: https://github.com/apache/flink/pull/8901#discussion_r298439071
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecTemporalUdtfJoinRule.scala
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical._
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalTableJoin
+import 
org.apache.flink.table.plan.util.TemporalJoinUtil.containsTemporalJoinCondition
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, WindowJoinUtil}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.JoinRelType
+
+import java.util
+
+class StreamExecTemporalUdtfJoinRule
+  extends RelOptRule(
+operand(
+  classOf[FlinkLogicalJoin],
 
 Review comment:
   I am really confused about `LogicalCorrelateToTemporalTableJoinRule`.. It 
seems that really TemporalTableJoin keeps `FlinkLogicalJoin` until 
`StreamExecTemporalUdtfJoinRule`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874613#comment-16874613
 ] 

Jark Wu edited comment on FLINK-12557 at 6/28/19 3:02 AM:
--

I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...',
format='json',
json.property-version = '1',
json.version='1',
json.derive-schema='true',
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...',
format='json',
format.property-version = '1',
format.version='1',
format.derive-schema='true'
{code}

What do you think [~danny0405] [~twalthr]?



was (Author: jark):
I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...',
format='json',
json.property-version = '1',
json.version='1',
json.derive-schema='true',
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...',
format='json',
format.property-version = '1',
format.version='1',
format.derive-schema='true'
{code}




> Unify create table DDL with clause and connector descriptor keys
> 
>
> Key: FLINK-12557
> URL: https://issues.apache.org/jira/browse/FLINK-12557
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
> Fix For: 1.9.0
>
>
> The *with* option in table DDL defines the properties needed for specific 
> connector to create TableSource/Sink. The properties structure for SqlClient 
> config YAML is defined in [Improvements to the Unified SQL Connector 
> API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  in this design, the properties can be categorized into 4 parts:
>  
>  # Top level properties: name, type(source, sink), update-mode ...
>  # Connector specific properties: connector.type, connector.path ...
>  # Format properties: format.type, format.fields.1.name ...
>  # Table schema properties: (can be omitted for DDL)
>  
> This properties structure is reasonable for YAML, but they are not that 
> concise enough for developers.  So there also defines a tool class named 
> [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67]
>  to reconstruct the data structure(like TableSchema) from the flat k-v 
> strings.
>  
> So in order to reduce complexity and keep the KV consistency for DDL with 
> properties and TableFactory properties, i proposed to simplify the DDL with 
> properties keys as following (corresponding to above 4 categories):
>  
>  # Top level properties: keep same as that in the YAML e.g. connector, 
> update-mode
>  # Connector specific properties: start with prefix named the connector type 
> e.g. for kafka connector, the properties are defined as kafka.k1 = v1, 
> kafka.k2 = v2
>  # Format properties: format.type simplified to format and the others with 
> prefix of the format name e.g. format = 'json', json.line-delimiter = "\n"
>  # Table schema properties: omitted.
> Here is a demo of creat table DDL:
> {code:java}
> CREATE TABLE Kafka10SourceTable (
>   intField INTEGER,
>   stringField VARCHAR(128) COMMENT 'User IP address',
>   longField BIGINT,
>   rowTimeField TIMESTAMP,
>   WATERMARK wm01 FOR  'longField' AS BOUNDED WITH DELAY '60' SECOND
> )
> COMMENT 'Kafka Source Table of topic user_ip_address'
> WITH (
>   connector='kafka',
>   kafka.property-version='1',
>   kafka.version='0.10',
>   kafka.topic='test-kafka-topic',
>   kafka.startup-mode = 'latest-offset'
>   kafka.specific-offset = 'offset'
>   format='json'
>   json.property-version = '1'
>   json.version='1'
>   json.derive-schema='true'
> )
> {code}



--
This message was sent by Atlassian JIRA

[jira] [Comment Edited] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874613#comment-16874613
 ] 

Jark Wu edited comment on FLINK-12557 at 6/28/19 3:00 AM:
--

I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...',
format='json',
json.property-version = '1',
json.version='1',
json.derive-schema='true',
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...',
format='json',
format.property-version = '1',
format.version='1',
format.derive-schema='true'
{code}





was (Author: jark):
I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...'
format='json'
json.property-version = '1'
json.version='1'
json.derive-schema='true'
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...'
format='json'
format.property-version = '1'
format.version='1'
format.derive-schema='true'
{code}




> Unify create table DDL with clause and connector descriptor keys
> 
>
> Key: FLINK-12557
> URL: https://issues.apache.org/jira/browse/FLINK-12557
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
> Fix For: 1.9.0
>
>
> The *with* option in table DDL defines the properties needed for specific 
> connector to create TableSource/Sink. The properties structure for SqlClient 
> config YAML is defined in [Improvements to the Unified SQL Connector 
> API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  in this design, the properties can be categorized into 4 parts:
>  
>  # Top level properties: name, type(source, sink), update-mode ...
>  # Connector specific properties: connector.type, connector.path ...
>  # Format properties: format.type, format.fields.1.name ...
>  # Table schema properties: (can be omitted for DDL)
>  
> This properties structure is reasonable for YAML, but they are not that 
> concise enough for developers.  So there also defines a tool class named 
> [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67]
>  to reconstruct the data structure(like TableSchema) from the flat k-v 
> strings.
>  
> So in order to reduce complexity and keep the KV consistency for DDL with 
> properties and TableFactory properties, i proposed to simplify the DDL with 
> properties keys as following (corresponding to above 4 categories):
>  
>  # Top level properties: keep same as that in the YAML e.g. connector, 
> update-mode
>  # Connector specific properties: start with prefix named the connector type 
> e.g. for kafka connector, the properties are defined as kafka.k1 = v1, 
> kafka.k2 = v2
>  # Format properties: format.type simplified to format and the others with 
> prefix of the format name e.g. format = 'json', json.line-delimiter = "\n"
>  # Table schema properties: omitted.
> Here is a demo of creat table DDL:
> {code:java}
> CREATE TABLE Kafka10SourceTable (
>   intField INTEGER,
>   stringField VARCHAR(128) COMMENT 'User IP address',
>   longField BIGINT,
>   rowTimeField TIMESTAMP,
>   WATERMARK wm01 FOR  'longField' AS BOUNDED WITH DELAY '60' SECOND
> )
> COMMENT 'Kafka Source Table of topic user_ip_address'
> WITH (
>   connector='kafka',
>   kafka.property-version='1',
>   kafka.version='0.10',
>   kafka.topic='test-kafka-topic',
>   kafka.startup-mode = 'latest-offset'
>   kafka.specific-offset = 'offset'
>   format='json'
>   json.property-version = '1'
>   json.version='1'
>   json.derive-schema='true'
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12557) Unify create table DDL with clause and connector descriptor keys

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874613#comment-16874613
 ] 

Jark Wu commented on FLINK-12557:
-

I think about it again. Maybe it is still not very concise when the connector 
name is long, for example: "elasticsearch", "postgresql-binlog".


{code:java}
connector='postgresql-binlog',
postgresql-binlog.property-version='1',
postgresql-binlog.version='0.10',
postgresql-binlog.url='...',
postgresql-binlog.username='...',
postgresql-binlog.password='...'
format='json'
json.property-version = '1'
json.version='1'
json.derive-schema='true'
{code}

How about making connection properties to be top-level and making other 
properties (e.g. format) to be structured. For example: 

{code:java}
type='postgresql-binlog',
property-version='1',
version='0.10',
url='...',
username='...',
password='...'
format='json'
format.property-version = '1'
format.version='1'
format.derive-schema='true'
{code}




> Unify create table DDL with clause and connector descriptor keys
> 
>
> Key: FLINK-12557
> URL: https://issues.apache.org/jira/browse/FLINK-12557
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
> Fix For: 1.9.0
>
>
> The *with* option in table DDL defines the properties needed for specific 
> connector to create TableSource/Sink. The properties structure for SqlClient 
> config YAML is defined in [Improvements to the Unified SQL Connector 
> API|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  in this design, the properties can be categorized into 4 parts:
>  
>  # Top level properties: name, type(source, sink), update-mode ...
>  # Connector specific properties: connector.type, connector.path ...
>  # Format properties: format.type, format.fields.1.name ...
>  # Table schema properties: (can be omitted for DDL)
>  
> This properties structure is reasonable for YAML, but they are not that 
> concise enough for developers.  So there also defines a tool class named 
> [DescriptorProperties|https://github.com/apache/flink/blob/b3604f7bee7456b8533e9ea222a833a2624e36c2/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java#L67]
>  to reconstruct the data structure(like TableSchema) from the flat k-v 
> strings.
>  
> So in order to reduce complexity and keep the KV consistency for DDL with 
> properties and TableFactory properties, i proposed to simplify the DDL with 
> properties keys as following (corresponding to above 4 categories):
>  
>  # Top level properties: keep same as that in the YAML e.g. connector, 
> update-mode
>  # Connector specific properties: start with prefix named the connector type 
> e.g. for kafka connector, the properties are defined as kafka.k1 = v1, 
> kafka.k2 = v2
>  # Format properties: format.type simplified to format and the others with 
> prefix of the format name e.g. format = 'json', json.line-delimiter = "\n"
>  # Table schema properties: omitted.
> Here is a demo of creat table DDL:
> {code:java}
> CREATE TABLE Kafka10SourceTable (
>   intField INTEGER,
>   stringField VARCHAR(128) COMMENT 'User IP address',
>   longField BIGINT,
>   rowTimeField TIMESTAMP,
>   WATERMARK wm01 FOR  'longField' AS BOUNDED WITH DELAY '60' SECOND
> )
> COMMENT 'Kafka Source Table of topic user_ip_address'
> WITH (
>   connector='kafka',
>   kafka.property-version='1',
>   kafka.version='0.10',
>   kafka.topic='test-kafka-topic',
>   kafka.startup-mode = 'latest-offset'
>   kafka.specific-offset = 'offset'
>   format='json'
>   json.property-version = '1'
>   json.version='1'
>   json.derive-schema='true'
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506580756
 
 
   > As an alternative, could we combine this with the hadoop 2.4 or scala 2.12 
profile?
   
   Would you mind explain how to combine it with hadoop 2.4 or scala 2.12? Does 
it mean the test will run with different hadoop or scala versions? And will the 
test run for each PR if we do that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506580008
 
 
   Hi @zentol, the `connector_hive_1` in the build of my own repo took about 13 
min and failed, which is expected because we're having some issue with 
compiling with Hive-1.2.1. So I suppose that's the overhead of the forced 
recompilation. Once we fix the build, I expect the testing to take another 2~3 
min. Of course the testing time will increase as we add more test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
wuchong commented on issue #8850: [FLINK-12954] Supports create(drop) view 
grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#issuecomment-506579118
 
 
   LGTM
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13020) UT Failure: ChainLengthDecreaseTest

2019-06-27 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874602#comment-16874602
 ] 

Yun Tang commented on FLINK-13020:
--

I think this is duplicated with FLINK-12916.

> UT Failure: ChainLengthDecreaseTest
> ---
>
> Key: FLINK-13020
> URL: https://issues.apache.org/jira/browse/FLINK-13020
> Project: Flink
>  Issue Type: Improvement
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> {code:java}
> 05:47:24.893 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 19.836 s <<< FAILURE! - in 
> org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest
> 05:47:24.895 [ERROR] testMigrationAndRestore[Migrate Savepoint: 
> 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest)
>   Time elapsed: 1.501 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
> cancellation from one of its inputs
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
> cancellation from one of its inputs
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
> received cancellation from one of its inputs
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
> received cancellation from one of its inputs
> ...
> 05:48:27.736 [ERROR] Errors: 
> 05:48:27.736 [ERROR]   
> ChainLengthDecreaseTest>AbstractOperatorRestoreTestBase.testMigrationAndRestore:102->AbstractOperatorRestoreTestBase.migrateJob:138
>  » Execution
> 05:48:27.736 [INFO] 
> {code}
> https://travis-ci.org/apache/flink/jobs/551053821



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-27 Thread GitBox
gaoyunhaii commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r298434963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -347,20 +365,65 @@ private MultiTaskSlot(
CompletableFuture 
slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId) 
{
super(slotRequestId, groupId);
+   Preconditions.checkNotNull(slotContextFuture);
 
this.parent = parent;
-   this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
 
this.children = new HashMap<>(16);
this.releasingChildren = false;
 
-   slotContextFuture.whenComplete(
-   (SlotContext ignored, Throwable throwable) -> {
-   if (throwable != null) {
-   release(throwable);
+   this.requestedResources = ResourceProfile.EMPTY;
+
+   this.slotContextFuture = 
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+   if (throwable != null) {
+   // If the underlying resource request 
fail, currently we fails all the requests to
+   // simplify the logic.
+   release(throwable);
+   throw new 
CompletionException(throwable);
+   }
+
+   if (parent == null) {
+   ResourceProfile allocated = 
ResourceProfile.EMPTY;
+   List childrenToEvict = new 
ArrayList<>();
+
+   for (TaskSlot slot : children.values()) 
{
+   ResourceProfile 
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+   if 
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+   allocated = 
allocatedIfInclude;
+   } else {
+   
childrenToEvict.add(slot);
+   }
+   }
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Not all requests are 
fulfilled due to over-allocated, number of requests is {}, " +
+   
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, 
" +
+   
"evicted requests is {},",
+   children.size(),
+   
childrenToEvict.size(),
+   
slotContext.getResourceProfile(),
+   allocated,
+   
childrenToEvict);
}
-   });
+
+   if (childrenToEvict.size() == 
children.size()) {
+   // This only happens when we 
request to RM using the resource profile of a task
+   // who is belonging to a 
CoLocationGroup. Similar to dealing with the fail of
 
 Review comment:
   This is because RM always returns a slot whose resource is larger than the 
requested one, without co-location, there should be at least the one who 
triggers the request to RM get fulfilled. The comments are enhanced to be more 
clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-27 Thread GitBox
gaoyunhaii commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r298434963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -347,20 +365,65 @@ private MultiTaskSlot(
CompletableFuture 
slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId) 
{
super(slotRequestId, groupId);
+   Preconditions.checkNotNull(slotContextFuture);
 
this.parent = parent;
-   this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
 
this.children = new HashMap<>(16);
this.releasingChildren = false;
 
-   slotContextFuture.whenComplete(
-   (SlotContext ignored, Throwable throwable) -> {
-   if (throwable != null) {
-   release(throwable);
+   this.requestedResources = ResourceProfile.EMPTY;
+
+   this.slotContextFuture = 
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+   if (throwable != null) {
+   // If the underlying resource request 
fail, currently we fails all the requests to
+   // simplify the logic.
+   release(throwable);
+   throw new 
CompletionException(throwable);
+   }
+
+   if (parent == null) {
+   ResourceProfile allocated = 
ResourceProfile.EMPTY;
+   List childrenToEvict = new 
ArrayList<>();
+
+   for (TaskSlot slot : children.values()) 
{
+   ResourceProfile 
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+   if 
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+   allocated = 
allocatedIfInclude;
+   } else {
+   
childrenToEvict.add(slot);
+   }
+   }
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Not all requests are 
fulfilled due to over-allocated, number of requests is {}, " +
+   
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, 
" +
+   
"evicted requests is {},",
+   children.size(),
+   
childrenToEvict.size(),
+   
slotContext.getResourceProfile(),
+   allocated,
+   
childrenToEvict);
}
-   });
+
+   if (childrenToEvict.size() == 
children.size()) {
+   // This only happens when we 
request to RM using the resource profile of a task
+   // who is belonging to a 
CoLocationGroup. Similar to dealing with the fail of
 
 Review comment:
   This is because RM always returns a slot whose resource is larger than the 
requested one, without co-location, there should be at least the one who 
triggers the request to RM get fulfilled. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on a change in pull request #8911: [FLINK-12995][hive] 
Add Hive-1.2.1 build to Travis
URL: https://github.com/apache/flink/pull/8911#discussion_r298434852
 
 

 ##
 File path: tools/travis/stage.sh
 ##
 @@ -164,6 +168,9 @@ function get_compile_modules_for_stage() {
 (${STAGE_TESTS})
 echo "-pl $MODULES_TESTS -am"
 ;;
+(${STAGE_CONNECTOR_HIVE_1})
+echo "-pl $MODULES_CONNECTOR_HIVE -am -Phive-1.2.1 clean"
 
 Review comment:
   Yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-27 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298434626
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   sorry for bringing you trouble here. I also found other person cause the 
same issue for unrelated html changes. I would double check the root PR who 
should cause this change, and you might ignore this change in your PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-06-27 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12171:

Affects Version/s: 1.9.0

> The network buffer memory size should not be checked against the heap size on 
> the TM side
> -
>
> Key: FLINK-12171
> URL: https://issues.apache.org/jira/browse/FLINK-12171
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.0, 1.9.0
> Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the 
> logic here.
>  
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently when computing the network buffer memory size on the TM side in 
> _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
> _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), 
> the computed network buffer memory size is checked to be less than 
> `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the 
> maximum heap memory (namely -Xmx) .
>  
> With the above process, when TM starts, -Xmx is computed in RM or in 
> _taskmanager.sh_ with (container memory - network buffer memory - managed 
> memory),  thus the above checking implies that the heap memory of the TM must 
> be larger than the network memory, which seems to be not necessary.
>  
> This may cause TM to use more memory than expected. For example, for a job 
> who has a large network throughput, uses may configure network memory to 2G. 
> However, if users want to assign 1G to heap memory, the TM will fail to 
> start, and user has to allocate at least 2G heap memory (in other words, 4G 
> in total for the TM instead of 3G) to make the TM runnable. This may cause 
> resource inefficiency.
>  
> Therefore, I think the network buffer memory size also need to be checked 
> against the total memory instead of the heap memory on the TM  side:
>  # Checks that networkBufFraction < 1.0.
>  # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
>  # Compare the network buffer memory with the total memory.
> This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create

2019-06-27 Thread GitBox
zhijiangW commented on issue #8779: [FLINK-12882][network] Remove 
ExecutionAttemptID argument from ResultPartitionFactory#create
URL: https://github.com/apache/flink/pull/8779#issuecomment-506574514
 
 
   Thanks for the review @azagrebin !
   @zentol could you help double check and merge if no other concerns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-27 Thread GitBox
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506574054
 
 
   I'm not sure I fully follow what you mean, in the following, I'm trying to 
answer your question. Please correct me if I'm not understanding right.
   - If you mean "the sync checkpoint/savepoint" is timeout, then the 
`pendingCheckpoint` will be abort, and the `onCompletionPromis` will complete 
with an exception, the job exit.
   - Currently, when stopping a job, the task will not exit, if you want the 
task exit, could use `cancel` instead of `stop`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12994) Improve the buffer processing performance in SpilledBufferOrEventSequence#getNext

2019-06-27 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) reassigned FLINK-12994:
-

Assignee: (was: Congxian Qiu(klion26))

> Improve the buffer processing performance in 
> SpilledBufferOrEventSequence#getNext
> -
>
> Key: FLINK-12994
> URL: https://issues.apache.org/jira/browse/FLINK-12994
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Congxian Qiu(klion26)
>Priority: Minor
>
> This is a follow-up issue of FLINK-12536, please see the benchmark there for 
> more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298427114
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -583,6 +583,25 @@ public void testInvalidUpsertOverwrite() {
"OVERWRITE expression is only used with INSERT mode");
}
 
+   @Test
+   public void testCreateViewWithProperty() {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298427036
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateView.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.List;
+
+/**
+ * CREATE VIEW DDL sql call.
+ */
+public class SqlCreateView extends SqlCreate implements ExtendedSqlNode {
+   public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("CREATE_VIEW", SqlKind.CREATE_VIEW);
+
+   private final SqlIdentifier viewName;
+   private final SqlNodeList fieldList;
+   private final SqlNode query;
+   private final SqlCharStringLiteral comment;
+
+   public SqlCreateView(
+   SqlParserPos pos,
+   SqlIdentifier viewName,
+   SqlNodeList fieldList,
+   SqlNode query,
+   boolean replace,
+   SqlCharStringLiteral comment) {
+   super(OPERATOR, pos, replace, false);
+   this.viewName = viewName;
+   this.fieldList = fieldList;
+   this.query = query;
+   this.comment = comment;
+   }
+
+   @Override
+   public List getOperandList() {
+   List ops = Lists.newArrayList();
+   ops.add(viewName);
+   ops.add(fieldList);
 
 Review comment:
   Yes, we want to support this. So use can rename the field.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
danny0405 commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298426924
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
 ##
 @@ -334,6 +317,50 @@ void PartitionSpecCommaList(SqlNodeList list) :
 
 }
 
+/**
+* Parses a create view or replace existing view statement.
+*   CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS 
select_statement
+*/
+SqlCreate SqlCreateView(Span s, boolean replace) : {
+boolean replaceView = false;
+SqlIdentifier viewName;
+SqlCharStringLiteral comment = null;
+SqlNode query;
+SqlNodeList fieldList = SqlNodeList.EMPTY;
+}
+{
+[   { replaceView = true; } ]
 
 Review comment:
   Yep, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-27 Thread GitBox
stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include 
the whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665
 
 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-27 Thread GitBox
stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the 
whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
flinkbot edited a comment on issue #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506545126
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @bowenli86
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @bowenli86
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @bowenli86
   * ✅ 5. Overall code [quality] is good.
   - Approved by @bowenli86
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506552250
 
 
   @xuefuz thanks for your contribution!
   
   LGTM, will merge once Travis build passes
   
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298404869
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 ##
 @@ -68,7 +68,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
   RexProgramExtractor.extractConjunctiveConditions(
 program,
 call.builder().getRexBuilder,
-new FunctionCatalog("default_catalog", "default_database"))
+new FunctionCatalog(null))
 
 Review comment:
   @twalthr @dawidwys @JingsongLi are we able to pass in CatalogManager here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298404869
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 ##
 @@ -68,7 +68,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
   RexProgramExtractor.extractConjunctiveConditions(
 program,
 call.builder().getRexBuilder,
-new FunctionCatalog("default_catalog", "default_database"))
+new FunctionCatalog(null))
 
 Review comment:
   are we able to pass in CatalogManager here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298405079
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##
 @@ -116,15 +119,48 @@ public void registerScalarFunction(String name, 
ScalarFunction function) {
}
 
public String[] getUserDefinedFunctions() {
-   return userFunctions.values().stream()
-   .map(FunctionDefinition::toString)
-   .toArray(String[]::new);
+   List result = new ArrayList<>();
+
+   // Get functions in catalog
+   Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+   try {
+   
result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase()));
+   } catch (DatabaseNotExistException e) {
+   // Ignore since there will always be a current database 
of the current catalog
+   }
+
+   // Get built-in functions
+   result.addAll(
+   BuiltInFunctionDefinitions.getDefinitions()
+   .stream()
+   .map(f -> f.getName())
+   .collect(Collectors.toList()));
+
+   return result.stream()
+   .map(n -> normalizeName(n))
+   .collect(Collectors.toList())
+   .toArray(new String[0]);
}
 
@Override
public Optional lookupFunction(String name) {
-   final FunctionDefinition userCandidate = 
userFunctions.get(normalizeName(name));
+   String functionName = normalizeName(name);
+
+   Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+
+   FunctionDefinition userCandidate = null;
+   try {
+   CatalogFunction catalogFunction = catalog.getFunction(
+   new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+   // TODO: use FunctionDefintionFactory to initiate a 
FunctionDefinition from CatalogFunction
 
 Review comment:
   we need FunctionDefintionFactory here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
flinkbot commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource 
from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506545126
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource 
from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506545081
 
 
   cc: @bowenli86, @terry, @lirui-apache 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13023) Generate HiveTableSource from from a Hive table

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13023:
---
Labels: pull-request-available  (was: )

> Generate HiveTableSource from from a Hive table
> ---
>
> Key: FLINK-13023
> URL: https://issues.apache.org/jira/browse/FLINK-13023
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
>
> As a followup for FLINK-11480, this adds the conversion from a Hive table to 
> a table source that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz opened a new pull request #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-27 Thread GitBox
xuefuz opened a new pull request #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921
 
 
   
   
   ## What is the purpose of the change
   
   Provide implementation of generating HiveTableSource instance from a Hive 
table, as part of Hive data connector work.
   
   Please note that the PR includes changes in PR #8890 
   
   ## Brief change log
   
   *(for example:)*
 - Added implementation in HiveTableFactory
 - Refactored HiveTableSource and HiveTableInputformat classes
 - Added corresponding tests
   
   ## Verifying this change
   
   This change added UT tests .
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13025) Elasticsearch 7.x support

2019-06-27 Thread Keegan Standifer (JIRA)
Keegan Standifer created FLINK-13025:


 Summary: Elasticsearch 7.x support
 Key: FLINK-13025
 URL: https://issues.apache.org/jira/browse/FLINK-13025
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / ElasticSearch
Affects Versions: 1.8.0
Reporter: Keegan Standifer


Elasticsearch 7.0.0 was released in April of 2019: 
[https://www.elastic.co/blog/elasticsearch-7-0-0-released]
The latest elasticsearch connector is 
[flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sjwiesman commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker

2019-06-27 Thread GitBox
sjwiesman commented on issue #8917: [FLINK-13017][docs] do not mount local 
$HOME into docker
URL: https://github.com/apache/flink/pull/8917#issuecomment-506542610
 
 
   Gotcha, then +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen

2019-06-27 Thread GitBox
NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix 
DescriptiveStatisticsHistogram#getCount() not returning the number of elements 
seen
URL: https://github.com/apache/flink/pull/8886#discussion_r298395591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 ##
 @@ -22,25 +22,30 @@
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link DescriptiveStatistics} as a Flink {@link Histogram}.
  */
 public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
 
private final DescriptiveStatistics descriptiveStatistics;
 
+   private final AtomicLong elementsSeen = new AtomicLong();
 
 Review comment:
   that depends on how users use it - usually, there should be >=1 writer 
thread(s), e.g. from a task and potentially also spawned user-threads, and 1 
reader (the metrics reporter)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen

2019-06-27 Thread GitBox
NicoK commented on a change in pull request #8886: [FLINK-12987][metrics] fix 
DescriptiveStatisticsHistogram#getCount() not returning the number of elements 
seen
URL: https://github.com/apache/flink/pull/8886#discussion_r298395591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 ##
 @@ -22,25 +22,30 @@
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link DescriptiveStatistics} as a Flink {@link Histogram}.
  */
 public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
 
private final DescriptiveStatistics descriptiveStatistics;
 
+   private final AtomicLong elementsSeen = new AtomicLong();
 
 Review comment:
   that depends on how users use it - usually, there should be >=1 writer 
thread(s) (tasks) and 1 reader (the metrics reporter)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible

2019-06-27 Thread GitBox
NicoK commented on issue #8877: [FLINK-12984][metrics] only call 
Histogram#getStatistics() once where possible
URL: https://github.com/apache/flink/pull/8877#issuecomment-506535212
 
 
   @StephanEwen yes, indirectly: I needed the (test-scope, test-jar) dependency 
to have a common test class `AbstractHistogramTest` and this needed to be in 
`flink-runtime` because `DescriptiveStatisticsHistogram` lives there.
   
   FYI: There was already a test-scope (normal jar) dependency on 
`flink-runtime`, presumably for similar reasons. I guess, the reason to put it 
into `flink-runtime` was to have some default to be available at any time 
without extra dependencies for the user, and also because our latency markers 
use the `DescriptiveStatisticsHistogram`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK edited a comment on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker

2019-06-27 Thread GitBox
NicoK edited a comment on issue #8917: [FLINK-13017][docs] do not mount local 
$HOME into docker
URL: https://github.com/apache/flink/pull/8917#issuecomment-506532565
 
 
   originally, it pointed to `/home/${USER_NAME}` which (on Linux) is basically 
the same - Patrick only made it work on MacOS as well


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker

2019-06-27 Thread GitBox
NicoK commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME 
into docker
URL: https://github.com/apache/flink/pull/8917#issuecomment-506532565
 
 
   originally, it pointed to `/home/$USER` which (on Linux) is basically the 
same - Patrick only made it work on MacOS as well


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-27 Thread GitBox
asfgit closed pull request #8863: [FLINK-12962][python] Allows pyflink to be 
pip installed.
URL: https://github.com/apache/flink/pull/8863
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12962) Allows pyflink to be pip installed

2019-06-27 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12962.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: f059b1981b8d2a62acfd2fac3456b8ca3a5bd18f

> Allows pyflink to be pip installed
> --
>
> Key: FLINK-12962
> URL: https://issues.apache.org/jira/browse/FLINK-12962
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The aim of this JIRA is to support to build a pip installable package.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
flinkbot commented on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-506515812
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13024) integrate FunctionCatalog with CatalogManager

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13024:
---
Labels: pull-request-available  (was: )

> integrate FunctionCatalog with CatalogManager
> -
>
> Key: FLINK-13024
> URL: https://issues.apache.org/jira/browse/FLINK-13024
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-06-27 Thread GitBox
bowenli86 opened a new pull request #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920
 
 
   This is an prototype of the first step to integrate FunctionCatalog with 
Catalog APIs. 
   
   I found some gaps that need some discussion. Please see my comments below.
   
   --- TO BE FILLED ---
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13024) integrate FunctionCatalog with CatalogManager

2019-06-27 Thread Bowen Li (JIRA)
Bowen Li created FLINK-13024:


 Summary: integrate FunctionCatalog with CatalogManager
 Key: FLINK-13024
 URL: https://issues.apache.org/jira/browse/FLINK-13024
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataSet, Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506513055
 
 
   As an alternative, could we combine this with the hadoop 2.4 or scala 2.12 
profile?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on a change in pull request #8911: [FLINK-12995][hive] Add 
Hive-1.2.1 build to Travis
URL: https://github.com/apache/flink/pull/8911#discussion_r298370606
 
 

 ##
 File path: tools/travis/stage.sh
 ##
 @@ -164,6 +168,9 @@ function get_compile_modules_for_stage() {
 (${STAGE_TESTS})
 echo "-pl $MODULES_TESTS -am"
 ;;
+(${STAGE_CONNECTOR_HIVE_1})
+echo "-pl $MODULES_CONNECTOR_HIVE -am -Phive-1.2.1 clean"
 
 Review comment:
   My guess is you're using clean so it is recompiled with a different hive 
version?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
zentol commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506512150
 
 
   hold on, do we _really_ need another profile just for hive? How long do the 
tests need? With the forced recompilation, how much time are we investing here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13023) Generate HiveTableSource from from a Hive table

2019-06-27 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-13023:
---

 Summary: Generate HiveTableSource from from a Hive table
 Key: FLINK-13023
 URL: https://issues.apache.org/jira/browse/FLINK-13023
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang


As a followup for FLINK-11480, this adds the conversion from a Hive table to a 
table sink that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13023) Generate HiveTableSource from from a Hive table

2019-06-27 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated FLINK-13023:

Description: As a followup for FLINK-11480, this adds the conversion from a 
Hive table to a table source that's used for data connector writing side.  
(was: As a followup for FLINK-11480, this adds the conversion from a Hive table 
to a table sink that's used for data connector writing side.)

> Generate HiveTableSource from from a Hive table
> ---
>
> Key: FLINK-13023
> URL: https://issues.apache.org/jira/browse/FLINK-13023
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> As a followup for FLINK-11480, this adds the conversion from a Hive table to 
> a table source that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations

2019-06-27 Thread GitBox
flinkbot commented on issue #8919: [FLINK-13022][table][hive] unify catalog 
function implementations
URL: https://github.com/apache/flink/pull/8919#issuecomment-506472767
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8904: [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table

2019-06-27 Thread GitBox
bowenli86 commented on issue #8904: [FLINK-13005][hive] HiveCatalog should not 
add 'flink.is_generic' key for Hive table
URL: https://github.com/apache/flink/pull/8904#issuecomment-506472411
 
 
   cc @xuefuz @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >