[GitHub] [flink] liyafan82 commented on issue #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
liyafan82 commented on issue #8682: [FLINK-12796][table-planner-blink] 
Introduce BaseArray and BaseMap to reduce conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#issuecomment-501976712
 
 
   > > Can you please provide the benchmark that produces the 10x performance 
improvement?
   > 
   > You can find the case in `SortAggITCase.testBigDataSimpleArrayUDAF`.
   > If you want to test previous code, you can modify 
`PrimitiveLongArrayConverter.toInternalImpl` to use `return 
BinaryArray.fromPrimitiveArray(value);`.
   > The difference is so obvious, both in generated code and performance.
   
   Good job. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 edited a comment on issue #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-13 Thread GitBox
sunjincheng121 edited a comment on issue #7227: [FLINK-11059] [runtime] do not 
add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#issuecomment-501973070
 
 
   
![image](https://user-images.githubusercontent.com/22488084/59485314-e2f68480-8ea7-11e9-9376-a48720a27a22.png)
   I fond that `DefaultExecutionSlotAllocator` is merged 9 hours 
ago(https://github.com/apache/flink/commit/8a39c84e29c9ad64febbd850c99b40d28b5359d1),
 so I think may CI cache problem, restart all CI stage. If we still can not 
pass the CI, I think we should rebase the code. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on issue #8682: [FLINK-12796][table-planner-blink] 
Introduce BaseArray and BaseMap to reduce conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#issuecomment-501973780
 
 
   > Can you please provide the benchmark that produces the 10x performance 
improvement?
   
   You can find the case in `SortAggITCase.testBigDataSimpleArrayUDAF`.
   If you want to test previous code, you can modify 
`PrimitiveLongArrayConverter.toInternalImpl` to use `return 
BinaryArray.fromPrimitiveArray(value);`.
   The difference is so obvious, both in generated code and performance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 edited a comment on issue #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-13 Thread GitBox
sunjincheng121 edited a comment on issue #7227: [FLINK-11059] [runtime] do not 
add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#issuecomment-501973070
 
 
   
![image](https://user-images.githubusercontent.com/22488084/59485314-e2f68480-8ea7-11e9-9376-a48720a27a22.png)
   I fond that `DefaultExecutionSlotAllocator` is merged 9 hours 
ago(https://github.com/apache/flink/commit/8a39c84e29c9ad64febbd850c99b40d28b5359d1),
 so I think may CI cache problem, restart all CI stage. (Maybe some bug in 
Travis script, but not relate this PR)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-13 Thread GitBox
sunjincheng121 commented on issue #7227: [FLINK-11059] [runtime] do not add 
releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#issuecomment-501973070
 
 
   
![image](https://user-images.githubusercontent.com/22488084/59485314-e2f68480-8ea7-11e9-9376-a48720a27a22.png)
   I did not find `DefaultExecutionSlotAllocator` in our code base, so I think 
may CI cache problem, restart all CI stage. (Maybe some bug in Travis script, 
but not relate this PR)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on issue #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-13 Thread GitBox
docete commented on issue #8707: [FLINK-12815] [table-planner-blink] Supports 
CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#issuecomment-501971743
 
 
   +1 LGTM, leave a comment about CatalogView expanding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-13 Thread GitBox
docete commented on a change in pull request #8707: [FLINK-12815] 
[table-planner-blink] Supports CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#discussion_r293660477
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelShuttles.scala
 ##
 @@ -97,14 +97,15 @@ class ExpandTableScanShuttle extends RelShuttleImpl {
   }
 
   /**
-* Converts [[LogicalTableScan]] the result [[RelNode]] tree by calling 
[[RelTable]]#toRel
+* Converts [[LogicalTableScan]] the result [[RelNode]] tree
+* by calling [[QueryOperationCatalogViewTable]]#toRel
 */
   override def visit(scan: TableScan): RelNode = {
 scan match {
   case tableScan: LogicalTableScan =>
-val relTable = tableScan.getTable.unwrap(classOf[RelTable])
-if (relTable != null) {
-  val rel = 
relTable.toRel(RelOptUtil.getContext(tableScan.getCluster), tableScan.getTable)
+val viewTable = 
tableScan.getTable.unwrap(classOf[QueryOperationCatalogViewTable])
 
 Review comment:
   AbstractCatalogView from Catalog has no QueryOperation. We must get the 
underlie QueryOperation and convert to QueryOperationCatalogViewTable somewhere.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293656023
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * ResultPartition that releases itself once all subpartitions have been 
consumed.
+ */
+public class ReleaseOnConsumptionResultPartition extends ResultPartition {
+
+   /**
+* The total number of references to subpartitions of this result. The 
result partition can be
+* safely released, iff the reference count is zero. A reference count 
of -1 denotes that the
+* result partition has been released.
+*/
+   private final AtomicInteger pendingReferences = new AtomicInteger();
+
+   ReleaseOnConsumptionResultPartition(
+   String owningTaskName,
+   ResultPartitionID partitionId,
+   ResultPartitionType partitionType,
+   ResultSubpartition[] subpartitions,
+   int numTargetKeyGroups,
+   ResultPartitionManager partitionManager,
+   FunctionWithException bufferPoolFactory) {
+   super(owningTaskName, partitionId, partitionType, 
subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
+   }
+
+   @Override
+   void pin() {
+   while (true) {
 
 Review comment:
   I created [FLINK-12842](https://issues.apache.org/jira/browse/FLINK-12842) 
and [FLINK-12843](https://issues.apache.org/jira/browse/FLINK-12843) for the 
ref-counter issues which could be solved separately after this PR merged, 
because these issues already exist before and are not in the scope of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12843) Refactor the pin logic in ResultPartition

2019-06-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12843:


 Summary: Refactor the pin logic in ResultPartition
 Key: FLINK-12843
 URL: https://issues.apache.org/jira/browse/FLINK-12843
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The pin logic is for adding the reference counter based on number of 
subpartitions in {{ResultPartition}}. It seems not necessary to do it in while 
loop as now, because the atomic counter would not be accessed by other threads 
during pin. If the `ResultPartition` is not created yet, the 
{{ResultPartition#createSubpartitionView}} would not be called and it would 
response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. 

So we could simple increase the reference counter in {{ResultPartition}} 
constructor directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8732: 
[FLINK-12720][python] Add the Python Table API Sphinx docs
URL: https://github.com/apache/flink/pull/8732#discussion_r293653787
 
 

 ##
 File path: flink-python/docs/conf.py
 ##
 @@ -0,0 +1,208 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+
+# -- Path setup --
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+sys.path.insert(0, os.path.abspath('.'))
+sys.path.insert(0, os.path.abspath('..'))
+
+# -- Project information -
+
+# project = u'Flink Python Table API'
+project = u'PyFlink'
+copyright = u''
+author = u'Author'
+
+# The short X.Y version
+version = '1.0'
 
 Review comment:
   The version should same as : 
https://github.com/apache/flink/blob/master/flink-python/pyflink/version.py


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12842) Fix invalid check released state during ResultPartition#createSubpartitionView

2019-06-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12842:


 Summary: Fix invalid check released state during 
ResultPartition#createSubpartitionView
 Key: FLINK-12842
 URL: https://issues.apache.org/jira/browse/FLINK-12842
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently in {{ResultPartition#createSubpartitionView}} it would check whether 
this partition is released before creating view. But this check is based on 
{{refCnt != -1}} which seems invalid, because the reference counter would not 
always reflect the released state.

In the case of {{ResultPartition#release/fail}}, the reference counter is not 
set to -1. Even if in the case of {{ResultPartition#onConsumedSubpartition}}, 
the reference counter seems also no chance to be -1.

So we could check the real {{isReleased}} state during creating view instead of 
reference counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293653903
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
 ##
 @@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer {
+
+   private final LogicalType keyType;
+   private final LogicalType valueType;
+
+   private final TypeSerializer keySerializer;
+   private final TypeSerializer valueSerializer;
+
+   private transient BinaryArray reuseKeyArray;
+   private transient BinaryArray reuseValueArray;
+   private transient BinaryArrayWriter reuseKeyWriter;
+   private transient BinaryArrayWriter reuseValueWriter;
+
+   public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+   this.keyType = keyType;
+   this.valueType = valueType;
+
+   this.keySerializer = InternalSerializers.create(keyType, new 
ExecutionConfig());
+   this.valueSerializer = InternalSerializers.create(valueType, 
new ExecutionConfig());
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new BaseMapSerializer(keyType, valueType);
+   }
+
+   @Override
+   public BaseMap createInstance() {
+   return new BinaryMap();
+   }
+
+   @Override
+   public BaseMap copy(BaseMap from) {
+   if (from instanceof GenericMap) {
+   Map fromMap = ((GenericMap) 
from).getMap();
+   HashMap toMap = new HashMap<>();
 
 Review comment:
   You are right,
   `DataFormatConverter` is not wrong, but `copy` maybe have some trouble.
   In fact, Runtime `MapSerializer` also has this problem.
   I will add some comment.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs

2019-06-13 Thread GitBox
dianfu commented on a change in pull request #8732: [FLINK-12720][python] Add 
the Python Table API Sphinx docs
URL: https://github.com/apache/flink/pull/8732#discussion_r293650042
 
 

 ##
 File path: flink-python/docs/pyflink.rst
 ##
 @@ -0,0 +1,33 @@
+.. 

+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+limitations under the License.
+   

+
+pyflink package
+===
+
+Subpackages
+---
+
+.. toctree::
+:maxdepth: 1
+
+pyflink.table
+
 
 Review comment:
   Add the following text here to indicate that the following classes are the 
content of package pyflink:
   ```suggestion
   Contents
   
   ```
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs

2019-06-13 Thread GitBox
dianfu commented on a change in pull request #8732: [FLINK-12720][python] Add 
the Python Table API Sphinx docs
URL: https://github.com/apache/flink/pull/8732#discussion_r293646408
 
 

 ##
 File path: flink-python/docs/index.rst
 ##
 @@ -0,0 +1,39 @@
+.. 

+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+limitations under the License.
+   

+
+Welcome to Flink Python API Docs!
+==
+
+.. toctree::
+   :maxdepth: 2
+   :caption: Contents
+
+   pyflink
+   pyflink.table
+
+
+Core Classes:
+---
+
+:class:`pyflink.table.TableEnvironment`
+
+Main entry point for Flink functionality.
 
 Review comment:
What about `Main entry point for Flink Table functionality.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs

2019-06-13 Thread GitBox
dianfu commented on a change in pull request #8732: [FLINK-12720][python] Add 
the Python Table API Sphinx docs
URL: https://github.com/apache/flink/pull/8732#discussion_r293652965
 
 

 ##
 File path: flink-python/docs/pyflink.table.rst
 ##
 @@ -0,0 +1,29 @@
+.. 

+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+limitations under the License.
+   

+
+pyflink.table package
+=
+
+Module contents
+---
+
+.. automodule:: pyflink.table
+:members:
+:undoc-members:
+:show-inheritance:
 
 Review comment:
   One empty line at the end of the file


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
liyafan82 commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293652318
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
 ##
 @@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer {
+
+   private final LogicalType keyType;
+   private final LogicalType valueType;
+
+   private final TypeSerializer keySerializer;
+   private final TypeSerializer valueSerializer;
+
+   private transient BinaryArray reuseKeyArray;
+   private transient BinaryArray reuseValueArray;
+   private transient BinaryArrayWriter reuseKeyWriter;
+   private transient BinaryArrayWriter reuseValueWriter;
+
+   public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+   this.keyType = keyType;
+   this.valueType = valueType;
+
+   this.keySerializer = InternalSerializers.create(keyType, new 
ExecutionConfig());
+   this.valueSerializer = InternalSerializers.create(valueType, 
new ExecutionConfig());
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new BaseMapSerializer(keyType, valueType);
+   }
+
+   @Override
+   public BaseMap createInstance() {
+   return new BinaryMap();
+   }
+
+   @Override
+   public BaseMap copy(BaseMap from) {
+   if (from instanceof GenericMap) {
+   Map fromMap = ((GenericMap) 
from).getMap();
+   HashMap toMap = new HashMap<>();
 
 Review comment:
   The user can provide a "correct" Map, but when the key value is wrapped in a 
HashMap, it can be a "wrong" Map.
   
   Let me illustrate with an example. Suppose the key type does not implement 
"hashCode" and "equals" functions correctly (suppose for simplicity, hashCode 
always returns 0, and equals always returns true).
   
   The input GenericMap can be based on a TreeMap, with a proper comparator to 
test key equality. 
   The input map is a "correct" map, because it implements get/put methods 
correctly. 
   
   However, when we insert the key/value pairs of the TreeMap into a HashMap, 
problems will occur. For this example, only one key/value pair will finally 
exist in the HashMap, which is an unexpected behavior.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8688: [FLINK-12760] [runtime] Implement ExecutionGraph to InputsLocationsRetriever Adapter

2019-06-13 Thread GitBox
eaglewatcherwb commented on a change in pull request #8688: [FLINK-12760] 
[runtime] Implement ExecutionGraph to InputsLocationsRetriever Adapter
URL: https://github.com/apache/flink/pull/8688#discussion_r293652038
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/EGBasedInputsLocationsRetrieverTest.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link EGBasedInputsLocationsRetriever}.
+ */
+public class EGBasedInputsLocationsRetrieverTest extends TestLogger {
+
+   /**
+* Tests that can get the producers of consumed result partitions.
+*/
+   @Test
+   public void testGetConsumedResultPartitionsProducers() throws Exception 
{
+   final JobVertex producer1 = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+   final JobVertex producer2 = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+   final JobVertex consumer = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+   consumer.connectNewDataSetAsInput(producer1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+   consumer.connectNewDataSetAsInput(producer2, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+   final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), producer1, 
producer2, consumer);
+   final EGBasedInputsLocationsRetriever inputsLocationsRetriever 
= new EGBasedInputsLocationsRetriever(eg);
+
+   ExecutionVertexID evIdOfProducer1 = new 
ExecutionVertexID(producer1.getID(), 0);
+   ExecutionVertexID evIdOfProducer2 = new 
ExecutionVertexID(producer2.getID(), 0);
+   ExecutionVertexID evIdOfConsumer = new 
ExecutionVertexID(consumer.getID(), 0);
+
+   Collection> producersOfProducer1 =
+   
inputsLocationsRetriever.getConsumedResultPartitionsProducers(evIdOfProducer1);
+   Collection> producersOfProducer2 =
+   
inputsLocationsRetriever.getConsumedResultPartitionsProducers(evIdOfProducer2);
+   Collection> producersOfConsumer =
+   
inputsLocationsRetriever.getConsumedResultPartitionsProducers(evIdOfConsumer);
+
+   assertThat(producersOfProducer1, is(empty()));
+   assertThat(producersOfProducer2, is(empty()));
+   assertThat(producersOfConsumer, 
contains(Arrays.asList(evIdOfProducer1), Arrays.asList(evIdOfProducer2)));
 
 Review comment:
   `Arrays.asList` with only one argument could be replaced with 
`Collections.singletonList`


This is an automated message from the Apache Git Service.
To respond to the message, please log 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8688: [FLINK-12760] [runtime] Implement ExecutionGraph to InputsLocationsRetriever Adapter

2019-06-13 Thread GitBox
eaglewatcherwb commented on a change in pull request #8688: [FLINK-12760] 
[runtime] Implement ExecutionGraph to InputsLocationsRetriever Adapter
URL: https://github.com/apache/flink/pull/8688#discussion_r293651290
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/EGBasedInputsLocationsRetrieverTest.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link EGBasedInputsLocationsRetriever}.
+ */
+public class EGBasedInputsLocationsRetrieverTest extends TestLogger {
+
+   /**
+* Tests that can get the producers of consumed result partitions.
+*/
+   @Test
+   public void testGetConsumedResultPartitionsProducers() throws Exception 
{
+   final JobVertex producer1 = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+   final JobVertex producer2 = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+   final JobVertex consumer = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+   consumer.connectNewDataSetAsInput(producer1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+   consumer.connectNewDataSetAsInput(producer2, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+   final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), producer1, 
producer2, consumer);
+   final EGBasedInputsLocationsRetriever inputsLocationsRetriever 
= new EGBasedInputsLocationsRetriever(eg);
+
+   ExecutionVertexID evIdOfProducer1 = new 
ExecutionVertexID(producer1.getID(), 0);
+   ExecutionVertexID evIdOfProducer2 = new 
ExecutionVertexID(producer2.getID(), 0);
+   ExecutionVertexID evIdOfConsumer = new 
ExecutionVertexID(consumer.getID(), 0);
+
+   Collection> producersOfProducer1 =
+   
inputsLocationsRetriever.getConsumedResultPartitionsProducers(evIdOfProducer1);
+   Collection> producersOfProducer2 =
+   
inputsLocationsRetriever.getConsumedResultPartitionsProducers(evIdOfProducer2);
+   Collection> producersOfConsumer =
+   
inputsLocationsRetriever.getConsumedResultPartitionsProducers(evIdOfConsumer);
+
+   assertThat(producersOfProducer1, is(empty()));
+   assertThat(producersOfProducer2, is(empty()));
+   assertThat(producersOfConsumer, 
contains(Arrays.asList(evIdOfProducer1), Arrays.asList(evIdOfProducer2)));
+   }
+
+   /**
+* Tests that when execution is not scheduled, getting task manager 
location will return null.
+*/
+   @Test
+   public void testGetNullTaskManagerLocationIfNotScheduled() throws 
Exception {
+   final JobVertex 

[GitHub] [flink] zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-13 Thread GitBox
zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#issuecomment-501958031
 
 
   ccc @xuefuz again to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293651140
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link ShuffleEnvironment} to allow tracking of partitions per job.
+ */
+public class JobAwareShuffleEnvironmentImpl implements 
JobAwareShuffleEnvironment {
+
+   private static final Consumer NO_OP_NOTIFIER = 
partitionId -> {};
+
+   private final ShuffleEnvironment backingShuffleEnvironment;
+   private final PartitionTable inProgressPartitionTable = new 
PartitionTable();
+   private final PartitionTable finishedPartitionTable = new 
PartitionTable();
+
+   /** Tracks which jobs are still being monitored, to ensure cleanup 
in cases where tasks are finishing while
+* the jobmanager connection is being terminated. This is a concurrent 
map since it is modified by both the
+* Task (via {@link #notifyPartitionFinished(JobID, 
ResultPartitionID)}} and
+* TaskExecutor (via {@link 
#releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID)}) thread. */
+   private final Set activeJobs = ConcurrentHashMap.newKeySet();
+
+   public JobAwareShuffleEnvironmentImpl(ShuffleEnvironment 
backingShuffleEnvironment) {
+   this.backingShuffleEnvironment = 
Preconditions.checkNotNull(backingShuffleEnvironment);
+   }
+
+   @Override
+   public boolean hasPartitionsOccupyingLocalResources(JobID jobId) {
+   return inProgressPartitionTable.hasTrackedPartitions(jobId) || 
finishedPartitionTable.hasTrackedPartitions(jobId);
+   }
+
+   @Override
+   public void markJobActive(JobID jobId) {
+   activeJobs.add(jobId);
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection resultPartitionIds) {
+   finishedPartitionTable.stopTrackingPartitions(jobId, 
resultPartitionIds);
+   backingShuffleEnvironment.releasePartitions(resultPartitionIds);
+   }
+
+   @Override
+   public void releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID 
jobId) {
+   activeJobs.remove(jobId);
+   Collection finishedPartitionsForJob = 
finishedPartitionTable.stopTrackingPartitions(jobId);
+   
backingShuffleEnvironment.releasePartitions(finishedPartitionsForJob);
+   }
+
+   /**
+* This method wraps partition writers for externally managed 
partitions and introduces callbacks into the lifecycle
+* methods of the {@link ResultPartitionWriter}.
+*/
+   @Override
+   public Collection 
createResultPartitionWriters(
+   JobID jobId,
+   String taskName,
+   ExecutionAttemptID executionAttemptID,
+   Collection 

[GitHub] [flink] gaoyunhaii commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
gaoyunhaii commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293650869
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 
 Review comment:
   It seems there are unused imports here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293650003
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
 ##
 @@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer {
+
+   private final LogicalType keyType;
+   private final LogicalType valueType;
+
+   private final TypeSerializer keySerializer;
+   private final TypeSerializer valueSerializer;
+
+   private transient BinaryArray reuseKeyArray;
+   private transient BinaryArray reuseValueArray;
+   private transient BinaryArrayWriter reuseKeyWriter;
+   private transient BinaryArrayWriter reuseValueWriter;
+
+   public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+   this.keyType = keyType;
+   this.valueType = valueType;
+
+   this.keySerializer = InternalSerializers.create(keyType, new 
ExecutionConfig());
+   this.valueSerializer = InternalSerializers.create(valueType, 
new ExecutionConfig());
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new BaseMapSerializer(keyType, valueType);
+   }
+
+   @Override
+   public BaseMap createInstance() {
+   return new BinaryMap();
+   }
+
+   @Override
+   public BaseMap copy(BaseMap from) {
+   if (from instanceof GenericMap) {
+   Map fromMap = ((GenericMap) 
from).getMap();
+   HashMap toMap = new HashMap<>();
 
 Review comment:
   This has nothing to do with `GenericMap`. If the user gives a wrong `Map`, 
it's wrong to turn it into a `BinaryMap` too. It is totally wrong...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293649615
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
 ##
 @@ -165,49 +168,33 @@ public int getArity() {
 
/**
 * Convert base row to binary row.
-* TODO modify it to code gen, and reuse BinaryRow
+* TODO modify it to code gen.
 
 Review comment:
   This is a method comments, it mean move this method implement to code gen 
instead of using `BinaryWriter.write` and `TypeGetterSetters.get`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
gaoyunhaii commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293649558
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * ResultPartition that releases itself once all subpartitions have been 
consumed.
+ */
+public class ReleaseOnConsumptionResultPartition extends ResultPartition {
+
+   /**
+* The total number of references to subpartitions of this result. The 
result partition can be
+* safely released, iff the reference count is zero. A reference count 
of -1 denotes that the
+* result partition has been released.
+*/
+   private final AtomicInteger pendingReferences = new AtomicInteger();
+
+   ReleaseOnConsumptionResultPartition(
+   String owningTaskName,
+   ResultPartitionID partitionId,
+   ResultPartitionType partitionType,
+   ResultSubpartition[] subpartitions,
+   int numTargetKeyGroups,
+   ResultPartitionManager partitionManager,
+   FunctionWithException bufferPoolFactory) {
+   super(owningTaskName, partitionId, partitionType, 
subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
+   }
+
+   @Override
+   void pin() {
+   while (true) {
 
 Review comment:
   Follow Zhijiang's thought, I think we can now remove the pin method from the 
ResultPartition, since it is only the implementation detail of the 
ReleaseOnConsumptionResultPartiton. Then we can move the initialization of the 
pending reference to the Constructor to ensure no concurrent access to the 
variable, then we can use pendingReferences.set(subpartitions.length) to 
initialize the variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
liyafan82 commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293648291
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
 ##
 @@ -165,49 +168,33 @@ public int getArity() {
 
/**
 * Convert base row to binary row.
-* TODO modify it to code gen, and reuse BinaryRow
+* TODO modify it to code gen.
 
 Review comment:
   I see the code gen related logic is already modified. You mean more changes 
are needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-13 Thread GitBox
stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include 
the whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665
 
 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-13 Thread GitBox
stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the 
whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
liyafan82 commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293648012
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
 ##
 @@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer {
+
+   private final LogicalType keyType;
+   private final LogicalType valueType;
+
+   private final TypeSerializer keySerializer;
+   private final TypeSerializer valueSerializer;
+
+   private transient BinaryArray reuseKeyArray;
+   private transient BinaryArray reuseValueArray;
+   private transient BinaryArrayWriter reuseKeyWriter;
+   private transient BinaryArrayWriter reuseValueWriter;
+
+   public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+   this.keyType = keyType;
+   this.valueType = valueType;
+
+   this.keySerializer = InternalSerializers.create(keyType, new 
ExecutionConfig());
+   this.valueSerializer = InternalSerializers.create(valueType, 
new ExecutionConfig());
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new BaseMapSerializer(keyType, valueType);
+   }
+
+   @Override
+   public BaseMap createInstance() {
+   return new BinaryMap();
+   }
+
+   @Override
+   public BaseMap copy(BaseMap from) {
+   if (from instanceof GenericMap) {
+   Map fromMap = ((GenericMap) 
from).getMap();
+   HashMap toMap = new HashMap<>();
 
 Review comment:
   "Just wrap user Map, users need to guarantee their logic."
   -> the users just guarantee that their Map works correctly according to the 
Map interface, but it does not mean their Map would work correctly when wrapped 
in a HashMap. Right?
   
   When that is the case, it may cause some weird problem which is hard to 
locate and debug. So this is not a reliable solution. 
   
   At the very least, we should write some comment in the JavaDoc about this 
explicitly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #7435: [FLINK-11226] Lack of getKeySelector in Scala KeyedStream API unlike Java KeyedStream

2019-06-13 Thread GitBox
yanghua commented on issue #7435: [FLINK-11226] Lack of getKeySelector in Scala 
KeyedStream API unlike Java KeyedStream
URL: https://github.com/apache/flink/pull/7435#issuecomment-501953286
 
 
   @sunjincheng121 I have a small PR holds a long time, can you have a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12836) Allow retained checkpoints to be persisted on success

2019-06-13 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863647#comment-16863647
 ] 

vinoyang commented on FLINK-12836:
--

Hi [~klion26] I think {{CHECKPOINT_RETAINED_ON_CANCELLATION}} has a flag: 
{{discardSubsumed}} its value is {{true}}. A checkpoint is subsumed when the 
maximum number of retained checkpoints is reached and a more recent checkpoint 
completes. Maybe this point is [~andreweduffy]'s thought.

> Allow retained checkpoints to be persisted on success
> -
>
> Key: FLINK-12836
> URL: https://issues.apache.org/jira/browse/FLINK-12836
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Andrew Duffy
>Assignee: vinoyang
>Priority: Major
>
> Currently, retained checkpoints are persisted with one of 3 strategies:
>  * {color:#33}CHECKPOINT_NEVER_RETAINED:{color} Retained checkpoints are 
> never persisted
>  * {color:#33}CHECKPOINT_RETAINED_ON_FAILURE:{color}{color:#33} 
> Latest retained checkpoint{color} is persisted in the face of job failures
>  * {color:#33}CHECKPOINT_RETAINED_ON_CANCELLATION{color}: Latest retained 
> checkpoint is persisted when job is canceled externally (e.g. via the REST 
> API)
>  
> I'm proposing a third persistence mode: _CHECKPOINT_RETAINED_ALWAYS_. This 
> mode would ensure that retained checkpoints are retained on successful 
> completion of the job, and can be resumed from later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjuwangg commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-13 Thread GitBox
zjuwangg commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293647266
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1079,7 +1100,27 @@ public void alterPartitionStatistics(ObjectPath 
tablePath, CatalogPartitionSpec
 
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, 
boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
+   try {
+   Partition hivePartition = getHivePartition(tablePath, 
partitionSpec);
+   Table hiveTable = getHiveTable(tablePath);
+   String partName = getPartitionName(tablePath, 
partitionSpec, hiveTable);
 
 Review comment:
   Yes, we can not just obtain partition keys from hive Partition 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on issue #8654: [FLINK-12647][network] Add feature flag to 
disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#issuecomment-501951296
 
 
   Thanks for the updates @zentol and the reviews @tillrohrmann .
   I like the current way and it looks pretty good now. I only left several nit 
comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293644729
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 ##
 @@ -112,13 +114,19 @@ public ResultPartitionBuilder setBufferPoolFactory(
return this;
}
 
+   public ResultPartitionBuilder isReleasedOnConsumption(boolean 
releasedOnConsumption) {
+   this.releasedOnConsumption = releasedOnConsumption;
+   return this;
+   }
+
public ResultPartition build() {
ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
partitionManager,
ioManager,
networkBufferPool,
networkBuffersPerChannel,
-   floatingNetworkBuffersPerGate);
+   floatingNetworkBuffersPerGate,
+   true);
 
 Review comment:
   After this value is set true, it seems no way to generate `ResultPartition` 
atm via builder no matter with `releasedOnConsumption` is set true or false.
   Another option is make this value false, and let `releasedOnConsumption` 
default value is true, then the final tag could  also be changed via 
`ResultPartitionBuilder#isReleasedOnConsumption(false)`.
   I am not sure whether to adjust this issue atm or fix when required future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293645376
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * ResultPartition that releases itself once all subpartitions have been 
consumed.
+ */
+public class ReleaseOnConsumptionResultPartition extends ResultPartition {
+
+   /**
+* The total number of references to subpartitions of this result. The 
result partition can be
+* safely released, iff the reference count is zero. A reference count 
of -1 denotes that the
+* result partition has been released.
+*/
+   private final AtomicInteger pendingReferences = new AtomicInteger();
+
+   ReleaseOnConsumptionResultPartition(
+   String owningTaskName,
+   ResultPartitionID partitionId,
+   ResultPartitionType partitionType,
+   ResultSubpartition[] subpartitions,
+   int numTargetKeyGroups,
+   ResultPartitionManager partitionManager,
+   FunctionWithException bufferPoolFactory) {
+   super(owningTaskName, partitionId, partitionType, 
subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
+   }
+
+   @Override
+   void pin() {
+   while (true) {
 
 Review comment:
   Actually I am not sure why it needs `while` loop here before. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.

2019-06-13 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reopened FLINK-12541:
-

Hi [~till.rohrmann] Thanks for helping review and merge the #8609, And #8609 is 
relating the FLINK-12788. So, I'll close the FLINK-12788, and reopen this JIRA. 
If I miss something, please correct me. :)

> Add deploy a Python Flink job and session cluster on Kubernetes support.
> 
>
> Key: FLINK-12541
> URL: https://issues.apache.org/jira/browse/FLINK-12541
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Runtime / REST
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add deploy a Python Flink job and session cluster on Kubernetes support.
> We need to have the same deployment step as the Java job. Please see: 
> [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293644729
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 ##
 @@ -112,13 +114,19 @@ public ResultPartitionBuilder setBufferPoolFactory(
return this;
}
 
+   public ResultPartitionBuilder isReleasedOnConsumption(boolean 
releasedOnConsumption) {
+   this.releasedOnConsumption = releasedOnConsumption;
+   return this;
+   }
+
public ResultPartition build() {
ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
partitionManager,
ioManager,
networkBufferPool,
networkBuffersPerChannel,
-   floatingNetworkBuffersPerGate);
+   floatingNetworkBuffersPerGate,
+   true);
 
 Review comment:
   After this value is set true, it seems no way to generate `ResultPartition` 
atm via builder no matter with `releasedOnConsumption` is set true or false.
   Another option is make this value false, and let `releasedOnConsumption` 
default value is true, then the final tag could  also be changed via 
`ResultPartitionBuilder#isReleasedOnConsumption(false)`.
   I am not sure whether to adjust this issue atm or fix when required future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12788) Add support to run a Python job-specific cluster on Kubernetes

2019-06-13 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12788.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 00d90a4fe3ff93d19ff6be3ed7b9cf9d0f5b0dfd

> Add support to run a Python job-specific cluster on Kubernetes
> --
>
> Key: FLINK-12788
> URL: https://issues.apache.org/jira/browse/FLINK-12788
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Deployment / Docker
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> As discussed in FLINK-12541, we need to support to run a Python job-specific 
> cluster on Kubernetes. To support this, we need to improve the job specific 
> docker image build scripts to support Python Table API jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293644286
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
 ##
 @@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer {
+
+   private final LogicalType keyType;
+   private final LogicalType valueType;
+
+   private final TypeSerializer keySerializer;
+   private final TypeSerializer valueSerializer;
+
+   private transient BinaryArray reuseKeyArray;
+   private transient BinaryArray reuseValueArray;
+   private transient BinaryArrayWriter reuseKeyWriter;
+   private transient BinaryArrayWriter reuseValueWriter;
+
+   public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+   this.keyType = keyType;
+   this.valueType = valueType;
+
+   this.keySerializer = InternalSerializers.create(keyType, new 
ExecutionConfig());
+   this.valueSerializer = InternalSerializers.create(valueType, 
new ExecutionConfig());
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new BaseMapSerializer(keyType, valueType);
+   }
+
+   @Override
+   public BaseMap createInstance() {
+   return new BinaryMap();
+   }
+
+   @Override
+   public BaseMap copy(BaseMap from) {
+   if (from instanceof GenericMap) {
+   Map fromMap = ((GenericMap) 
from).getMap();
+   HashMap toMap = new HashMap<>();
 
 Review comment:
   No,
   First: only `IdentityConverter` will use `GenericMap`, others will use 
`BinaryMap`.
   Second: Just wrap user `Map`, users need to guarantee their logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293640776
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/client/PythonShellParser.java
 ##
 @@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.client;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Command line parser for Python shell.
+ */
+public class PythonShellParser {
+   private static final Option OPTION_HELP = Option
+   .builder("h")
+   .required(false)
+   .longOpt("help")
+   .desc("Show the help message with descriptions of all options.")
+   .build();
+
+   private static final Option OPTION_CONTAINER = Option
+   .builder("n")
+   .required(false)
+   .longOpt("container")
+   .hasArg()
+   .desc("Number of YARN container to allocate (=Number of Task 
Managers)")
+   .build();
+
+   private static final Option OPTION_JM_MEMORY = Option
+   .builder("jm")
+   .required(false)
+   .longOpt("jobManagerMemory")
+   .hasArg()
+   .desc("Memory for JobManager Container with optional unit 
(default: MB)")
+   .build();
+
+   private static final Option OPTION_NAME = Option
+   .builder("nm")
+   .required(false)
+   .longOpt("name")
+   .hasArg()
+   .desc("Set a custom name for the application on YARN")
+   .build();
+
+   private static final Option OPTION_QUEUE = Option
+   .builder("qu")
+   .required(false)
+   .longOpt("queue")
+   .hasArg()
+   .desc("Specify YARN queue.")
+   .build();
+
+   private static final Option OPTION_SLOTS = Option
+   .builder("s")
+   .required(false)
+   .longOpt("slots")
+   .hasArg()
+   .desc("Number of slots per TaskManager")
+   .build();
+
+   private static final Option OPTION_TM_MEMORY = Option
+   .builder("tm")
+   .required(false)
+   .longOpt("taskManagerMemory")
+   .hasArg()
+   .desc("Memory per TaskManager Container with optional unit 
(default: MB)")
+   .build();
+
+   // cluster types
+   private static final String LOCAL_RUN = "local";
+   private static final String REMOTE_RUN = "remote";
+   private static final String YARN_RUN = "yarn";
+
+   // Options that will be used in mini cluster.
+   private static final Options LOCAL_OPTIONS = getLocalOptions(new 
Options());
+
+   // Options that will be used in remote cluster.
+   private static final Options REMOTE_OPTIONS = getRemoteOptions(new 
Options());
+
+   // Options that will be used in yarn cluster.
+   private static final Options YARN_OPTIONS = getYarnOptions(new 
Options());
+
+   public static void main(String[] args) {
+   if (args.length < 1) {
+   printError("You should specify cluster type or -h | 
--help option");
+   System.exit(1);
+   }
+   String command = args[0];
+   List commandOptions = null;
+   try {
+   switch (command) {
+   case LOCAL_RUN:
+   commandOptions = parseLocal(args);
+   break;
+   case REMOTE_RUN:
+   commandOptions = parseRemote(args);
+   

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293642907
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
+
+## 使用
+
+当前Python shell支持Table API的功能。
+在启动之后,Table Environment的相关内容将会被自动加载。
+可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。
+
+### Table API
+
+下面的内容是关于如何通过Python Table API来实现一个简单的作业:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> bt_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("batch_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("batch_sink")
+>>> bt_env.execute()
+{% endhighlight %}
+
+
+
+## 启动
+
+为了概览Python Shell提供的可选参数,可以使用:
+
+{% highlight bash %}
+bin/pyflink-shell.sh --help
+{% endhighlight %}
+
+### Local
+
+可以指定Python Shell运行在local模式下,只需要执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+
+### Remote
+
+可以指定Python Shell运行在一个指定的JobManager上,通过关键字`remote`和对应的JobManager
 
 Review comment:
   We can delete `可以指定`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293638255
 
 

 ##
 File path: docs/ops/python_shell.md
 ##
 @@ -0,0 +1,185 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink comes with an integrated interactive Python Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell only supports Table API currently.
+The Table Environments are automatically prebound after startup. 
+Use "bt_env" and "st_env" to access BatchTableEnvironment and 
StreamTableEnvironment respectively.
+
+### Table API
+
+The example below is a simple program using Table API:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
 
 Review comment:
   I think we can set the `set_parallelism(1)`, then we can check the result 
easily, What to do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293642567
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
 
 Review comment:
   `关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容` -> 
`关于如何在一个Cluster集群上运行Python shell,可以参考启动章节介绍。`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293643657
 
 

 ##
 File path: flink-python/pyflink/shell.py
 ##
 @@ -0,0 +1,128 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import platform
+
+from pyflink.table import *
+
+print("Using Python version %s (%s, %s)" % (
+platform.python_version(),
+platform.python_build()[0],
+platform.python_build()[1]))
+
+welcome_msg = u'''
+ \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+ \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593 \
+ \u2592\u2593\u2588\u2588\u2588\u2593\u2592
+  \u2593\u2588\u2588\u2588\u2593\u2591\u2591\u2592 \
+  \u2592\u2592\u2593\u2588\u2588\u2592  \u2592
+\u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588 \
+\u2593\u2593\u2592\u2591  \u2592\u2588\u2588\u2588\u2588
+\u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588 \
+\u2588\u2592\u2592\u2588\u2592\u2588\u2592
+  \u2591\u2593\u2588\u2588\u2588\u2588   \u2593 \
+  \u2591\u2592\u2588\u2588
+\u2593\u2588   
\u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+  \u2588\u2591 \u2588   \u2592\u2592\u2591   
\u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593
  \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+   \u2591\u2592\u2588\u2593\u2593\u2588\u2588   
\u2593\u2588\u2592\u2593\u2588\u2592\u2593\u2588\u2588\u2593 
\u2591\u2588\u2591
+ \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588   
  \u2592\u2588
\u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+\u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588 
  \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+  \u2591\u2588\u2588\u2593  \u2591\u2588\u2591\u2588  
\u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 
\u2588\u2588\u2593\u2591\u2592
+ \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591  \u2593 \u2591\u2588 
\u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591\u2591\u2588\u2591\u2593  
\u2593\u2591
+\u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592  
\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591   
\u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588   
\u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 
\u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
+ \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 
\u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592  
\u2592\u2588\u2588\u2593   \u2591\u2588\u2592
+ \u2593\u2588\u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 
 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593  \u2591\u2592\u2591 
\u2593\u2588
+ \u2588\u2588\u2593\u2588\u2588\u2592
\u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592
\u2593\u2588\u2588\u2588  \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   
\u2591\u2593\u2593\u2592\u2591\u2591   
\u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591  
\u2591\u2592\u2593\u2592  \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  
\u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591
\u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588   
\u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593   
\u2592\u2593\u2593\u2588\u2588\u2593\u2593\u2592  

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293638505
 
 

 ##
 File path: docs/ops/python_shell.md
 ##
 @@ -0,0 +1,185 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink comes with an integrated interactive Python Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell only supports Table API currently.
+The Table Environments are automatically prebound after startup. 
+Use "bt_env" and "st_env" to access BatchTableEnvironment and 
StreamTableEnvironment respectively.
+
+### Table API
+
+The example below is a simple program using Table API:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
 
 Review comment:
   It's better to tell the user how to check the result. currently, I think we 
can give the user some example to read the sink file, What do you think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293643067
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
+
+## 使用
+
+当前Python shell支持Table API的功能。
+在启动之后,Table Environment的相关内容将会被自动加载。
+可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。
+
+### Table API
+
+下面的内容是关于如何通过Python Table API来实现一个简单的作业:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> bt_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("batch_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("batch_sink")
+>>> bt_env.execute()
+{% endhighlight %}
+
+
+
+## 启动
+
+为了概览Python Shell提供的可选参数,可以使用:
+
+{% highlight bash %}
+bin/pyflink-shell.sh --help
+{% endhighlight %}
+
+### Local
+
+可以指定Python Shell运行在local模式下,只需要执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+
+### Remote
+
+可以指定Python Shell运行在一个指定的JobManager上,通过关键字`remote`和对应的JobManager
+的地址和端口号来进行指定:
+
+{% highlight bash %}
+bin/pyflink-shell.sh remote  
+{% endhighlight %}
+
+### Yarn Python Shell cluster
+
+可以指定Python Shell运行在YARN集群之上。YARN的container的数量可以通过参数`-n `进行
 
 Review comment:
   `可以指定Python Shell运行在YARN集群之上` -> `Python Shell可以运行在YARN集群之上。`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293642713
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
+
+## 使用
+
+当前Python shell支持Table API的功能。
+在启动之后,Table Environment的相关内容将会被自动加载。
+可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。
+
+### Table API
+
+下面的内容是关于如何通过Python Table API来实现一个简单的作业:
 
 Review comment:
   下面是一个通过Python Shell 运行的简单示例。


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293641314
 
 

 ##
 File path: docs/ops/python_shell.md
 ##
 @@ -0,0 +1,185 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink comes with an integrated interactive Python Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell only supports Table API currently.
+The Table Environments are automatically prebound after startup. 
+Use "bt_env" and "st_env" to access BatchTableEnvironment and 
StreamTableEnvironment respectively.
+
+### Table API
+
+The example below is a simple program using Table API:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> bt_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("batch_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("batch_sink")
+>>> bt_env.execute()
+{% endhighlight %}
+
+
+
+## Setup
+
+To get an overview of what options the Python Shell provides, please use
+
+{% highlight bash %}
+bin/pyflink-shell.sh --help
+{% endhighlight %}
+
+### Local
+
+To use the shell with an integrated Flink cluster just execute:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+
+### Remote
+
+To use it with a running cluster, please start the Python shell with the 
keyword `remote`
+and supply the host and port of the JobManager with:
+
+{% highlight bash %}
+bin/pyflink-shell.sh remote  
+{% endhighlight %}
+
+### Yarn Python Shell cluster
+
+The shell can deploy a Flink cluster to YARN, which is used exclusively by the
+shell. The number of YARN containers can be controlled by the parameter `-n 
`.
+The shell deploys a new Flink cluster on YARN and connects the
+cluster. You can also specify options for YARN cluster such as memory for
+JobManager, name of YARN application, etc.
+
+For example, to start a Yarn cluster for the Python Shell with two TaskManagers
+use the following:
+
+{% highlight bash %}
+ bin/pyflink-shell.sh yarn -n 2
+{% endhighlight %}
+
+For all other options, see the full reference at the bottom.
+
+
+### Yarn Session
+
+If you have previously deployed a Flink cluster using the Flink Yarn Session,
+the Python shell can connect with it using the following command:
+
+{% highlight bash %}
+ bin/pyflink-shell.sh yarn
+{% endhighlight %}
+
+
+## Full Reference
 
 Review comment:
   The doc of the `full reference` should some as the command line output`:
   
![image](https://user-images.githubusercontent.com/22488084/59479344-42946600-8e8f-11e9-9afc-7c3b5d7415e6.png)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293642161
 
 

 ##
 File path: docs/ops/python_shell.md
 ##
 @@ -0,0 +1,185 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink comes with an integrated interactive Python Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell only supports Table API currently.
+The Table Environments are automatically prebound after startup. 
+Use "bt_env" and "st_env" to access BatchTableEnvironment and 
StreamTableEnvironment respectively.
+
+### Table API
+
+The example below is a simple program using Table API:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
 
 Review comment:
   I think we should not only remove the file, but alos need delete the folder 
by `shutil.rmtree(result_path)`, What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293642781
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
+
+## 使用
+
+当前Python shell支持Table API的功能。
+在启动之后,Table Environment的相关内容将会被自动加载。
+可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。
+
+### Table API
+
+下面的内容是关于如何通过Python Table API来实现一个简单的作业:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> bt_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("batch_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("batch_sink")
+>>> bt_env.execute()
+{% endhighlight %}
+
+
+
+## 启动
+
+为了概览Python Shell提供的可选参数,可以使用:
 
 Review comment:
   查看Python Shell提供的可选参数,可以使用:


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293642820
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
+
+## 使用
+
+当前Python shell支持Table API的功能。
+在启动之后,Table Environment的相关内容将会被自动加载。
+可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。
+
+### Table API
+
+下面的内容是关于如何通过Python Table API来实现一个简单的作业:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> bt_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("batch_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("batch_sink")
+>>> bt_env.execute()
+{% endhighlight %}
+
+
+
+## 启动
+
+为了概览Python Shell提供的可选参数,可以使用:
+
+{% highlight bash %}
+bin/pyflink-shell.sh --help
+{% endhighlight %}
+
+### Local
+
+可以指定Python Shell运行在local模式下,只需要执行:
 
 Review comment:
   Python Shell运行在local模式下,只需要执行:


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293262036
 
 

 ##
 File path: docs/ops/python_shell.zh.md
 ##
 @@ -0,0 +1,179 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink附带了一个集成的交互式Python Shell。
+它既能够用在本地启动的local模式,也能够用在集群启动的cluster模式下。
+
+为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+关于如何在一个Cluster集群上运行Python shell,可以参考下面的启动那一节的内容
+
+## 使用
+
+当前Python shell支持Table API的功能。
+在启动之后,Table Environment的相关内容将会被自动加载。
+可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。
+
+### Table API
+
+下面的内容是关于如何通过Table API来实现一个wordcount的作业:
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> st_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("stream_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("stream_sink")
+>>> st_env.execute()
+{% endhighlight %}
+
+
+{% highlight python %}
+>>> import tempfile
+>>> import os
+>>> sink_path = tempfile.gettempdir() + '/batch.csv'
+>>> if os.path.isfile(sink_path):
+>>> os.remove(sink_path)
+>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
+>>> bt_env.connect(FileSystem().path(sink_path))\
+>>> .with_format(OldCsv()
+>>> .field_delimiter(',')
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .with_schema(Schema()
+>>> .field("a", DataTypes.BIGINT())
+>>> .field("b", DataTypes.STRING())
+>>> .field("c", DataTypes.STRING()))\
+>>> .register_table_sink("batch_sink")
+>>> t.select("a + 1, b, c")\
+>>> .insert_into("batch_sink")
+>>> bt_env.execute()
+{% endhighlight %}
+
+
+
+## 启动
+
+为了概览Python Shell提供的可选参数,可以使用:
+
+{% highlight bash %}
+bin/pyflink-shell.sh --help
+{% endhighlight %}
+
+### Local
+
+可以指定Python Shell运行在local本地,只需要执行:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+
+### Remote
+
+可以指定Python Shell运行在一个指定的JobManager上,通过关键字`remote`和对应的JobManager
+的地址和端口号来进行指定:
+
+{% highlight bash %}
+bin/pyflink-shell.sh remote  
+{% endhighlight %}
+
+### Yarn Python Shell cluster
+
+可以指定Python Shell运行在YARN集群之上。YARN的container的数量可以通过参数`-n `进行
+指定。Python shell在Yarn上部署一个新的Flink集群,并进行连接。除了指定container数量,你也
+可以指定JobManager的内存,YARN应用的名字等参数。
+例如,运行Python Shell部署一个包含两个TaskManager的Yarn集群:
+
+{% highlight bash %}
+ bin/pyflink-shell.sh yarn -n 2
+{% endhighlight %}
+
+关于所有可选的参数,可以查看本页面底部的完整说明。
+
+
+### Yarn Session
+
+如果你已经通过Flink Yarn Session部署了一个Flink集群,能够通过以下的命令连接到这个集群:
+
+{% highlight bash %}
+ bin/pyflink-shell.sh yarn
+{% endhighlight %}
+
+
+## 完整的参考
+
+{% highlight bash %}
+Flink Python Shell
+使用: pyflink-shell.sh [local|remote|yarn] [options] ...
+
+命令: local [选项]
+启动一个部署在local本地的Flink Python shell
 
 Review comment:
   `local本地` -> `local`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293643766
 
 

 ##
 File path: flink-python/pyflink/shell.py
 ##
 @@ -0,0 +1,128 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import platform
+
+from pyflink.table import *
+
+print("Using Python version %s (%s, %s)" % (
+platform.python_version(),
+platform.python_build()[0],
+platform.python_build()[1]))
+
+welcome_msg = u'''
+ \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+ \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593 \
+ \u2592\u2593\u2588\u2588\u2588\u2593\u2592
+  \u2593\u2588\u2588\u2588\u2593\u2591\u2591\u2592 \
+  \u2592\u2592\u2593\u2588\u2588\u2592  \u2592
+\u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588 \
+\u2593\u2593\u2592\u2591  \u2592\u2588\u2588\u2588\u2588
+\u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588 \
+\u2588\u2592\u2592\u2588\u2592\u2588\u2592
+  \u2591\u2593\u2588\u2588\u2588\u2588   \u2593 \
+  \u2591\u2592\u2588\u2588
+\u2593\u2588   
\u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+  \u2588\u2591 \u2588   \u2592\u2592\u2591   
\u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593
  \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+   \u2591\u2592\u2588\u2593\u2593\u2588\u2588   
\u2593\u2588\u2592\u2593\u2588\u2592\u2593\u2588\u2588\u2593 
\u2591\u2588\u2591
+ \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588   
  \u2592\u2588
\u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+\u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588 
  \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+  \u2591\u2588\u2588\u2593  \u2591\u2588\u2591\u2588  
\u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 
\u2588\u2588\u2593\u2591\u2592
+ \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591  \u2593 \u2591\u2588 
\u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591\u2591\u2588\u2591\u2593  
\u2593\u2591
+\u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592  
\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591   
\u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588   
\u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 
\u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
+ \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 
\u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592  
\u2592\u2588\u2588\u2593   \u2591\u2588\u2592
+ \u2593\u2588\u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 
 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593  \u2591\u2592\u2591 
\u2593\u2588
+ \u2588\u2588\u2593\u2588\u2588\u2592
\u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592
\u2593\u2588\u2588\u2588  \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   
\u2591\u2593\u2593\u2592\u2591\u2591   
\u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591  
\u2591\u2592\u2593\u2592  \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  
\u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591
\u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588   
\u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593   
\u2592\u2593\u2593\u2588\u2588\u2593\u2593\u2592  

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293634615
 
 

 ##
 File path: docs/ops/python_shell.md
 ##
 @@ -0,0 +1,185 @@
+---
+title: "Python REPL"
+nav-parent_id: ops
+nav-pos: 7
+---
+
+
+Flink comes with an integrated interactive Python Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+{% highlight bash %}
+bin/pyflink-shell.sh local
+{% endhighlight %}
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell only supports Table API currently.
+The Table Environments are automatically prebound after startup. 
+Use "bt_env" and "st_env" to access BatchTableEnvironment and 
StreamTableEnvironment respectively.
+
+### Table API
+
+The example below is a simple program using Table API:
 
 Review comment:
   `The example below is a simple program using Table API` -> `The example 
below is a simple program in the Python shell`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8675: [FLINK-12716][python] Add an interactive shell for Python Table API

2019-06-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8675: 
[FLINK-12716][python] Add an interactive shell for Python Table API
URL: https://github.com/apache/flink/pull/8675#discussion_r293634271
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/pyflink-shell.sh
 ##
 @@ -0,0 +1,79 @@
+#!/bin/bash
 
 Review comment:
   Can we rename `pyflink-shell.sh` to `start-python-shell.sh`? jsut like 
`start-scala-shell.sh` :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293643294
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
 ##
 @@ -165,49 +168,33 @@ public int getArity() {
 
/**
 * Convert base row to binary row.
-* TODO modify it to code gen, and reuse BinaryRow
+* TODO modify it to code gen.
 
 Review comment:
   We still need take this logical to code gen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuai-xu commented on issue #8688: [FLINK-12760] [runtime] Implement ExecutionGraph to InputsLocationsRetriever Adapter

2019-06-13 Thread GitBox
shuai-xu commented on issue #8688: [FLINK-12760] [runtime] Implement 
ExecutionGraph to InputsLocationsRetriever Adapter
URL: https://github.com/apache/flink/pull/8688#issuecomment-501947360
 
 
   @GJL Of course, I have rebased it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11879) Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput

2019-06-13 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11879:
--
Description: 
- Rejects the jobs containing operators which were implemented 
`InputSelectable` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented `BoundedInput` 
or `BoundedMultiInput` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented 
`InputSelectable` in case that credit-based flow control is disabled.

  was:
- Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented `BoundedInput` 
or `BoundedMultiInput` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case that credit-based flow control is disabled.


> Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and 
> BoundedMultiInput
> --
>
> Key: FLINK-11879
> URL: https://issues.apache.org/jira/browse/FLINK-11879
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> - Rejects the jobs containing operators which were implemented 
> `InputSelectable` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `InputSelectable` in case that credit-based flow control is disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11879) Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput

2019-06-13 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11879:
--
Description: 
- Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented `BoundedInput` 
or `BoundedMultiInput` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case that credit-based flow control is disabled.

  was:
- Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented `BoundedInput` 
or `BoundedTwoInput` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case that credit-based flow control is disabled.


> Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and 
> BoundedMultiInput
> --
>
> Key: FLINK-11879
> URL: https://issues.apache.org/jira/browse/FLINK-11879
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> - Rejects the jobs containing operators which were implemented 
> `TwoInputSelectable` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `TwoInputSelectable` in case that credit-based flow control is disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] shuai-xu commented on a change in pull request #8688: [FLINK-12760] [runtime] Implement ExecutionGraph to InputsLocationsRetriever Adapter

2019-06-13 Thread GitBox
shuai-xu commented on a change in pull request #8688: [FLINK-12760] [runtime] 
Implement ExecutionGraph to InputsLocationsRetriever Adapter
URL: https://github.com/apache/flink/pull/8688#discussion_r293643179
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/EGBasedInputsLocationsRetriever.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of {@link InputsLocationsRetriever} based on the {@link 
ExecutionGraph}.
+ */
+public class EGBasedInputsLocationsRetriever implements 
InputsLocationsRetriever {
+
+   private final ExecutionGraph executionGraph;
+
+   public EGBasedInputsLocationsRetriever(ExecutionGraph executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   }
+
+   @Override
+   public Collection> 
getConsumedResultPartitionsProducers(
+   ExecutionVertexID executionVertexId) {
+   ExecutionVertex ev = getExecutionVertex(executionVertexId);
+
+   List> resultPartitionProducers = 
new ArrayList<>(ev.getNumberOfInputs());
+   for (int i = 0 ; i < ev.getNumberOfInputs(); i++) {
+   ExecutionEdge[] inputEdges = ev.getInputEdges(i);
+   List producers = new 
ArrayList<>(inputEdges.length);
+   for (ExecutionEdge inputEdge : inputEdges) {
+   ExecutionVertex producer = 
inputEdge.getSource().getProducer();
+   producers.add(new 
ExecutionVertexID(producer.getJobvertexId(), 
producer.getParallelSubtaskIndex()));
+   }
+   resultPartitionProducers.add(producers);
+
+   }
+   return resultPartitionProducers;
+   }
+
+   @Override
+   public Optional> 
getTaskManagerLocation(ExecutionVertexID executionVertexId) {
+   ExecutionVertex ev = getExecutionVertex(executionVertexId);
+
+   if (ev.getExecutionState() != ExecutionState.CREATED) {
+   return 
Optional.of(ev.getCurrentTaskManagerLocationFuture());
+   } else {
+   return Optional.empty();
+   }
+   }
+
+   private ExecutionVertex getExecutionVertex(ExecutionVertexID 
executionVertexId) {
+
+   ExecutionJobVertex ejv = 
executionGraph.getJobVertex(executionVertexId.getJobVertexId());
+   if (ejv == null || ejv.getParallelism() <= 
executionVertexId.getSubtaskIndex()) {
+   throw new 
IllegalArgumentException(String.format("Failed to find execution {} in 
execution graph.", executionVertexId));
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and Overwrit

2019-06-13 Thread GitBox
lirui-apache commented on a change in pull request #8695: 
[FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce 
PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
URL: https://github.com/apache/flink/pull/8695#discussion_r293642748
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class with trait about partitionable table sink. This is mainly 
used for
 
 Review comment:
   Yes we do. Actually the example in this very doc suggests it supports 
dynamic partitioning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9854) Allow passing multi-line input to SQL Client CLI

2019-06-13 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863636#comment-16863636
 ] 

vinoyang commented on FLINK-9854:
-

[~docete] yes, you are right. Please feel free to open a new issue for you 
purpose.

> Allow passing multi-line input to SQL Client CLI
> 
>
> Key: FLINK-9854
> URL: https://issues.apache.org/jira/browse/FLINK-9854
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Timo Walther
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We should support {{flink-cli < query01.sql}} or {{echo "INSERT INTO bar 
> SELECT * FROM foo" | flink-cli}} for convenience. I'm not sure how well we 
> support multilines and EOF right now. Currenlty, with the experimental {{-u}} 
> flag the user also gets the correct error code after the submission, with 
> {{flink-cli < query01.sql}} the CLI would either stay in interactive mode or 
> always return success.
> We should also discuss which statements are allowed. Actually, only DDL and 
> {{INSERT INTO}} statements make sense so far.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] shuai-xu commented on issue #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-13 Thread GitBox
shuai-xu commented on issue #7227: [FLINK-11059] [runtime] do not add releasing 
failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#issuecomment-501946385
 
 
   Thank you, @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12229) Implement Lazy Scheduling Strategy

2019-06-13 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang closed FLINK-12229.
--

> Implement Lazy Scheduling Strategy
> --
>
> Key: FLINK-12229
> URL: https://issues.apache.org/jira/browse/FLINK-12229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Gary Yao
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Implement a {{SchedulingStrategy}} that covers the functionality of 
> {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the 
> input data are available.
> Acceptance Criteria:
>  * New strategy is tested in isolation using test implementations (i.e., 
> without having to submit a job)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on issue #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-13 Thread GitBox
JingsongLi commented on issue #8718: [FLINK-12824][table-planner-blink] Set 
parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#issuecomment-501946257
 
 
   This calculation method basically completely reuses batch code.
   LGTM +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293642164
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -350,18 +336,6 @@ public String toString() {
 * operation.
 */
void pin() {
 
 Review comment:
   The above javadoc `The partition can only be released after each 
subpartition has been consumed once per pin
   operation` might also be adjusted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-13 Thread GitBox
zhijiangW commented on a change in pull request #8654: [FLINK-12647][network] 
Add feature flag to disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293641912
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -337,8 +324,7 @@ public boolean isReleased() {
@Override
public String toString() {
return "ResultPartition " + partitionId.toString() + " [" + 
partitionType + ", "
-   + subpartitions.length + " subpartitions, "
-   + pendingReferences + " pending references]";
+   + subpartitions.length + " subpartitions";
 
 Review comment:
   nit: left `]` in the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableT

2019-06-13 Thread GitBox
wuchong commented on a change in pull request #8695: 
[FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce 
PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
URL: https://github.com/apache/flink/pull/8695#discussion_r293641738
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class with trait about partitionable table sink. This is mainly 
used for
 
 Review comment:
   I'm not sure, this is the current design in Blink.  Do we support dynamic 
partition for sink in Blink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293641180
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
 ##
 @@ -71,10 +75,12 @@ public static TypeSerializer create(LogicalType type, 
ExecutionConfig config) {
DecimalType decimalType = (DecimalType) type;
return new 
DecimalSerializer(decimalType.getPrecision(), decimalType.getScale());
case ARRAY:
-   return BinaryArraySerializer.INSTANCE;
+   return new BaseArraySerializer(((ArrayType) 
type).getElementType(), config);
 
 Review comment:
   InternalSerializers.create is used to create serializers. runtime per record 
process should not invoke it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8682: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead to blink

2019-06-13 Thread GitBox
JingsongLi commented on a change in pull request #8682: 
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce 
conversion overhead to blink
URL: https://github.com/apache/flink/pull/8682#discussion_r293640912
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
 ##
 @@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericArray;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType;
+
+/**
+ * Serializer for {@link BaseArray}.
+ */
+public class BaseArraySerializer extends TypeSerializer {
+
+   private final LogicalType eleType;
+   private final TypeSerializer eleSer;
+
+   private transient BinaryArray reuseArray;
+   private transient BinaryArrayWriter reuseWriter;
+
+   public BaseArraySerializer(LogicalType eleType, ExecutionConfig conf) {
+   this.eleType = eleType;
+   this.eleSer = InternalSerializers.create(eleType, conf);
+   }
+
+   private BaseArraySerializer(LogicalType eleType, TypeSerializer eleSer) 
{
+   this.eleType = eleType;
+   this.eleSer = eleSer;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public TypeSerializer duplicate() {
+   return new BaseArraySerializer(eleType, eleSer.duplicate());
+   }
+
+   @Override
+   public BaseArray createInstance() {
+   return new BinaryArray();
+   }
+
+   @Override
+   public BaseArray copy(BaseArray from) {
+   return from instanceof GenericArray ?
+   copyGenericArray((GenericArray) from) :
+   ((BinaryArray) from).copy();
+   }
+
+   @Override
+   public BaseArray copy(BaseArray from, BaseArray reuse) {
+   return copy(from);
+   }
+
+   private GenericArray copyGenericArray(GenericArray array) {
+   Object arr;
+   if (array.isPrimitiveArray()) {
+   switch (eleType.getTypeRoot()) {
+   case BOOLEAN:
+   arr = Arrays.copyOf((boolean[]) 
array.getArray(), array.numElements());
+   break;
+   case TINYINT:
+   arr = Arrays.copyOf((byte[]) 
array.getArray(), array.numElements());
+   break;
+   case SMALLINT:
 
 Review comment:
   we don't support logicalType char now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [flink] lirui-apache commented on a change in pull request #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and Overwrit

2019-06-13 Thread GitBox
lirui-apache commented on a change in pull request #8695: 
[FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce 
PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
URL: https://github.com/apache/flink/pull/8695#discussion_r293640156
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class with trait about partitionable table sink. This is mainly 
used for
 
 Review comment:
   Why is it mainly used for static partition?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #8729: [FLINK-12831][table-planner][table-api-java] Split FunctionCatalog into Flink & Calcite specific parts

2019-06-13 Thread GitBox
docete commented on a change in pull request #8729: 
[FLINK-12831][table-planner][table-api-java] Split FunctionCatalog into Flink & 
Calcite specific parts
URL: https://github.com/apache/flink/pull/8729#discussion_r293638486
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
 ##
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.expressions.AggregateFunctionDefinition;
+import org.apache.flink.table.expressions.FunctionDefinition;
+import org.apache.flink.table.expressions.ScalarFunctionDefinition;
+import org.apache.flink.table.expressions.TableFunctionDefinition;
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSyntax;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}.
+ */
+@Internal
+public class FunctionCatalogOperatorTable implements SqlOperatorTable {
+
+   private final FunctionCatalog functionCatalog;
+   private final FlinkTypeFactory typeFactory;
+
+   public FunctionCatalogOperatorTable(
+   FunctionCatalog functionCatalog,
+   FlinkTypeFactory typeFactory) {
+   this.functionCatalog = functionCatalog;
+   this.typeFactory = typeFactory;
+   }
+
+   @Override
+   public void lookupOperatorOverloads(
+   SqlIdentifier opName,
+   SqlFunctionCategory category,
+   SqlSyntax syntax,
+   List operatorList) {
+   if (!opName.isSimple()) {
+   return;
+   }
+
+   // We lookup only user functions via CatalogOperatorTable. 
Built in functions should
+   // go through BasicOperatorTable
+   if (isUserFunction(category)) {
+   return;
+   }
+
+   String name = opName.getSimple();
+   Optional candidateFunction = 
functionCatalog.lookupFunction(name);
+
+   candidateFunction.flatMap(functionDefinition ->
+   convertToSqlFunction(category, name, functionDefinition)
+   ).ifPresent(operatorList::add);
+   }
+
+   private boolean isUserFunction(SqlFunctionCategory category) {
 
 Review comment:
   Use 'isNotUserFunction' for more clear and less misleading


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-13 Thread GitBox
zjuwangg commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293639722
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1069,7 +1075,22 @@ public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics ta
 
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
-
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   // Set table column stats. This only works for 
non-partitioned tables.
+   if (!isTablePartitioned(hiveTable)) {
+   
client.updateTableColumnStatistics(HiveCatalogUtil.createTableColumnStats(hiveTable,
 columnStatistics.getColumnStatisticsData()));
+   } else {
+   throw new 
CatalogException(String.format("Failed to alter partition table column stats of 
table %s",
 
 Review comment:
   exactly


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12829) jobmaster log taskmanger address when task transition state to failed with error

2019-06-13 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863623#comment-16863623
 ] 

Congxian Qiu(klion26) commented on FLINK-12829:
---

Hi, do you think FLINK-11165 is enough for your case?

> jobmaster log taskmanger address when task transition state to failed with 
> error
> 
>
> Key: FLINK-12829
> URL: https://issues.apache.org/jira/browse/FLINK-12829
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: zhaoshijie
>Assignee: zhaoshijie
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> when look over jobmaster log to find why job failed, log is like that: 
> {code:java}
> // code placeholder
> 2019-06-13 10:13:28,066 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Socket Stream -> Flat Map -> (Flat Map, Flat Map, Flat Map -> Sink: Unnamed) 
> (1/1) (dc39ce18d3f1b4aa15198151e7544162) switched from RUNNING to FAILED.
> java.net.ConnectException: Connection refused
>   at java.net.PlainSocketImpl.socketConnect(Native Method)
>   at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>   at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>   at 
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>   at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>   at java.net.Socket.connect(Socket.java:589)
>   at 
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:112)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:299)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> 2
> {code}
> I  know why job failed from this log, but when i want to find out  where the 
> failed task(Lead to job failed) is located, i find it is not obvious. however 
> some environment problems actully needs to know taskmanger address, so we can 
> add this on log
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-13 Thread GitBox
sunjincheng121 commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-501940453
 
 
   The CI failed, I restarted the CI due to It looks the EVN problem. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12841) Unify catalog meta-objects implementations and remove their interfaces

2019-06-13 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863617#comment-16863617
 ] 

Jark Wu commented on FLINK-12841:
-

+1 to the change

> Unify catalog meta-objects implementations and remove their interfaces
> --
>
> Key: FLINK-12841
> URL: https://issues.apache.org/jira/browse/FLINK-12841
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We've been evaluating the original design in FLIP-30 that proposed creating 
> catalog meta object interfaces and individual impl of those interfaces in 
> each catalog. Interfaces we have so far are: {{CatalogDatabase}}, 
> {{CatalogTable}}, {{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, 
> and e.g. for {{CatalogTable}} interface, we have impls of 
> {{GenericCatalogTable}} and {{HiveCatalogTable}}, etc.
> Well, we have gone pretty far on FLIP-30 now. When we look back and 
> re-evaluate this design, we actually found there are not many differences 
> between, e.g. {{GenericCatalogTable}} and {{HiveCatalogTable}}. And having 
> this class hierarchy complicates situations and development. E.g. this 
> requires sql client to have a hard dependency on flink-connector-hive in 
> order to create {{HiveCatalogTable}} from DDL, which I don't think is 
> necessary.
> On the other side in Blink, we don't have this hierarchy and have been just 
> using a single meta-object class (e.g. just {{CatalogTable}}) to represent 
> data in different catalogs, it has been fine without any problem and all the 
> difference among catalogs can be stored as properties.
> Thus we propose removing the inheritance hierarchy and impl of the 
> meta-object interfaces for each individual catalog. To be more specific, take 
> table classes for example, we will replace {{CatalogTable}} with existing 
> {{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
> {{HiveCatalogTable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12836) Allow retained checkpoints to be persisted on success

2019-06-13 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863613#comment-16863613
 ] 

Congxian Qiu(klion26) commented on FLINK-12836:
---

Hi [~andreweduffy], Could you please share why 
{{CHECKPOINT_RETAINED_ON_CANCELLATION}} is not enough for your case?

> Allow retained checkpoints to be persisted on success
> -
>
> Key: FLINK-12836
> URL: https://issues.apache.org/jira/browse/FLINK-12836
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Andrew Duffy
>Assignee: vinoyang
>Priority: Major
>
> Currently, retained checkpoints are persisted with one of 3 strategies:
>  * {color:#33}CHECKPOINT_NEVER_RETAINED:{color} Retained checkpoints are 
> never persisted
>  * {color:#33}CHECKPOINT_RETAINED_ON_FAILURE:{color}{color:#33} 
> Latest retained checkpoint{color} is persisted in the face of job failures
>  * {color:#33}CHECKPOINT_RETAINED_ON_CANCELLATION{color}: Latest retained 
> checkpoint is persisted when job is canceled externally (e.g. via the REST 
> API)
>  
> I'm proposing a third persistence mode: _CHECKPOINT_RETAINED_ALWAYS_. This 
> mode would ensure that retained checkpoints are retained on successful 
> completion of the job, and can be resumed from later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12355) KafkaITCase.testTimestamps is unstable

2019-06-13 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863590#comment-16863590
 ] 

leesf commented on FLINK-12355:
---

Another instance:[https://api.travis-ci.org/v3/job/545241400/log.txt]

> KafkaITCase.testTimestamps is unstable
> --
>
> Key: FLINK-12355
> URL: https://issues.apache.org/jira/browse/FLINK-12355
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.9.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> The {{KafkaITCase.testTimestamps}} failed on Travis because it timed out.
> https://api.travis-ci.org/v3/job/525503117/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 edited a comment on issue #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-13 Thread GitBox
sunjincheng121 edited a comment on issue #7227: [FLINK-11059] [runtime] do not 
add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#issuecomment-501931581
 
 
   The CI failed, and I restarted the connector CI stage due to I think it's 
maybe the 
   ENV problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-13 Thread GitBox
sunjincheng121 commented on issue #7227: [FLINK-11059] [runtime] do not add 
releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#issuecomment-501931581
 
 
   The CI failed, and I restarted the connector CI state due to I think it's 
maybe the 
   ENV problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293623628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
-   if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   if (checkpointInterval < 10 || checkpointTimeout < 10 ||
 
 Review comment:
   Yes, I have a good reason. `ExecutionGraph#enableCheckpointing`(see 
[here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L536))
 consider the illegal value is (`< 10`) and the [test 
case](https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java#L189)
 also know this. So we'd better have the same criterion.
   
   From a realistic point of view, I also think that 10 is a meaningful value. 
If the interval is allowed to be 1 ms, the frequency is too high and therefore 
has no practical significance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293628893
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+   private final Set countedCheckpointIds;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 .");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //for compatibility purposes with user job behavior
+   case CHECKPOINT_DECLINED_TASK_NOT_READY:
+   case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
+   case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
+   case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
+   case CHECKPOINT_DECLINED_SUBSUMED:
+   case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   //ignore
+   break;
+
+   case CHECKPOINT_DECLINED:
+   //we should make sure one checkpoint only be 
counted once
+ 

[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627636
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 ##
 @@ -618,7 +618,7 @@ public void testWithCheckPointing() throws Exception {
// expected behaviour
}
 
-   env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, true);
+   env.enableCheckpointing(10, 
CheckpointingMode.EXACTLY_ONCE, true);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627605
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 ##
 @@ -609,7 +609,7 @@ public void testWithCheckPointing() throws Exception {
// Test force checkpointing
 
try {
-   env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, false);
+   env.enableCheckpointing(10, 
CheckpointingMode.EXACTLY_ONCE, false);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627519
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##
 @@ -583,7 +583,7 @@ private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
long interval = cfg.getCheckpointInterval();
-   if (interval > 0) {
+   if (interval >= 10) {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627478
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -125,8 +128,8 @@ public long getCheckpointInterval() {
 * @param checkpointInterval The checkpoint interval, in milliseconds.
 */
public void setCheckpointInterval(long checkpointInterval) {
-   if (checkpointInterval <= 0) {
-   throw new IllegalArgumentException("Checkpoint interval 
must be larger than zero");
+   if (checkpointInterval < 10) {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293627503
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -146,8 +149,8 @@ public long getCheckpointTimeout() {
 * @param checkpointTimeout The checkpoint timeout, in milliseconds.
 */
public void setCheckpointTimeout(long checkpointTimeout) {
-   if (checkpointTimeout <= 0) {
-   throw new IllegalArgumentException("Checkpoint timeout 
must be larger than zero");
+   if (checkpointTimeout < 10) {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293625337
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1367,13 +1503,18 @@ public void testMinTimeBetweenCheckpointsInterval() 
throws Exception {
 
final long delay = 50;
 
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   12,   // periodic interval is 12 ms
 
 Review comment:
   Yes, it is required. Just like the prior comment, I have changed the 
condition of sanity checks in `CheckpointCoordinatorConfiguration.java`. We 
should have the same criterion with `ExecutionGraph`, the correct condition is 
interval must be larger than or equal to 10.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293623628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
-   if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   if (checkpointInterval < 10 || checkpointTimeout < 10 ||
 
 Review comment:
   Yes, I have a good reason. `ExecutionGraph#enableCheckpointing`(see 
[here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L536))
 consider the illegal value is (`< 10`) and the [test 
case](https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java#L189)
 also know this. So we'd better have the same criterion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293625337
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ##
 @@ -1367,13 +1503,18 @@ public void testMinTimeBetweenCheckpointsInterval() 
throws Exception {
 
final long delay = 50;
 
+   CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+   12,   // periodic interval is 12 ms
 
 Review comment:
   Yes, it is required. Just like the prior comment, I have changed the 
condition of sanity checks in `CheckpointCoordinatorConfiguration.java`. We 
should have the same criterion, the correct condition is interval must be 
larger than or equal to 10.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-13 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293623628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
-   if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   if (checkpointInterval < 10 || checkpointTimeout < 10 ||
 
 Review comment:
   Yes, I have a good reason. `CheckpointConfig#setCheckpointInterval` and 
`CheckpointConfig#setCheckpointTimeout` consider the illegal value is (`< 10`). 
So we'd better have the same criterion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-13 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293615802
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[jira] [Updated] (FLINK-12841) Unify catalog meta-objects implementations and remove their interfaces

2019-06-13 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12841:
-
Description: 
We've been evaluating the original design in FLIP-30 that proposed creating 
catalog meta object interfaces and individual impl of those interfaces in each 
catalog. Interfaces we have so far are: {{CatalogDatabase}}, {{CatalogTable}}, 
{{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, and e.g. for 
{{CatalogTable}} interface, we have impls of {{GenericCatalogTable}} and 
{{HiveCatalogTable}}, etc.

Well, we have gone pretty far on FLIP-30 now. When we look back and re-evaluate 
this design, we actually found there are not many differences between, e.g. 
{{GenericCatalogTable}} and {{HiveCatalogTable}}. And having this class 
hierarchy complicates situations and development. E.g. this requires sql client 
to have a hard dependency on flink-connector-hive in order to create 
{{HiveCatalogTable}} from DDL, which I don't think is necessary.

On the other side in Blink, we don't have this hierarchy and have been just 
using a single meta-object class (e.g. just {{CatalogTable}}) to represent data 
in different catalogs, it has been fine without any problem and all the 
difference among catalogs can be stored as properties.

Thus we propose removing the inheritance hierarchy and impl of the meta-object 
interfaces for each individual catalog. To be more specific, take table classes 
for example, we will replace {{CatalogTable}} with existing 
{{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
{{HiveCatalogTable}}.

  was:
We've been evaluating the original design in FLIP-30 that proposed creating 
catalog meta object interfaces and individual impl of those interfaces in each 
catalog. Interfaces we have so far are: {{CatalogDatabase}}, {{CatalogTable}}, 
{{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, and e.g. for 
{{CatalogTable}} interface, we have impls of {{GenericCatalogTable}} and 
{{HiveCatalogTable}}, etc.

Well, we have gone pretty far on FLIP-30 now. When we look back and re-evaluate 
this design, we actually found there are not many differences between, e.g. 
{{GenericCatalogTable}} and {{HiveCatalogTable}}. And having this class 
hierarchy complicates situations and development. E.g. this requires sql client 
to have a hard dependency on flink-connector-hive in order to create 
{{HiveCatalogTable}} from DDL.

On the other side in Blink, we don't have this hierarchy and have been just 
using a single meta-object class (e.g. just {{CatalogTable}}) to represent data 
in different catalogs, it has been fine without any problem and all the 
difference among catalogs can be stored as properties.

Thus we propose removing the inheritance hierarchy and impl of the meta-object 
interfaces for each individual catalog. To be more specific, take table classes 
for example, we will replace {{CatalogTable}} with existing 
{{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
{{HiveCatalogTable}}.


> Unify catalog meta-objects implementations and remove their interfaces
> --
>
> Key: FLINK-12841
> URL: https://issues.apache.org/jira/browse/FLINK-12841
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We've been evaluating the original design in FLIP-30 that proposed creating 
> catalog meta object interfaces and individual impl of those interfaces in 
> each catalog. Interfaces we have so far are: {{CatalogDatabase}}, 
> {{CatalogTable}}, {{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, 
> and e.g. for {{CatalogTable}} interface, we have impls of 
> {{GenericCatalogTable}} and {{HiveCatalogTable}}, etc.
> Well, we have gone pretty far on FLIP-30 now. When we look back and 
> re-evaluate this design, we actually found there are not many differences 
> between, e.g. {{GenericCatalogTable}} and {{HiveCatalogTable}}. And having 
> this class hierarchy complicates situations and development. E.g. this 
> requires sql client to have a hard dependency on flink-connector-hive in 
> order to create {{HiveCatalogTable}} from DDL, which I don't think is 
> necessary.
> On the other side in Blink, we don't have this hierarchy and have been just 
> using a single meta-object class (e.g. just {{CatalogTable}}) to represent 
> data in different catalogs, it has been fine without any problem and all the 
> difference among catalogs can be stored as properties.
> Thus we propose removing the inheritance hierarchy and impl of the 
> meta-object interfaces for each individual catalog. To be more specific, take 
> table classes for example, we will 

[jira] [Updated] (FLINK-12841) Unify catalog meta-objects implementations and remove their interfaces

2019-06-13 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12841:
-
Description: 
We've been evaluating the original design in FLIP-30 that proposed creating 
catalog meta object interfaces and individual impl of those interfaces in each 
catalog. Interfaces we have so far are: {{CatalogDatabase}}, {{CatalogTable}}, 
{{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, and e.g. for 
{{CatalogTable}} interface, we have impls of {{GenericCatalogTable}} and 
{{HiveCatalogTable}}, etc.

Well, we have gone pretty far on FLIP-30 now. When we look back and re-evaluate 
this design, we actually found there are not many differences between, e.g. 
{{GenericCatalogTable}} and {{HiveCatalogTable}}. And having this class 
hierarchy complicates situations and development. E.g. this requires sql client 
to have a hard dependency on flink-connector-hive in order to create 
{{HiveCatalogTable}} from DDL.

On the other side in Blink, we don't have this hierarchy and have been just 
using a single meta-object class (e.g. just {{CatalogTable}}) to represent data 
in different catalogs, it has been fine without any problem and all the 
difference among catalogs can be stored as properties.

Thus we propose removing the inheritance hierarchy and impl of the meta-object 
interfaces for each individual catalog. To be more specific, take table classes 
for example, we will replace {{CatalogTable}} with existing 
{{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
{{HiveCatalogTable}}.

  was:
We've been evaluating the original design in FLIP-30 that proposed creating 
catalog meta object interfaces and individual impl of those interfaces in each 
catalog. Interfaces we have so far are: {{CatalogDatabase}}, {{CatalogTable}}, 
{{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, and e.g. for 
{{CatalogTable}} interface, we have impls of {{GenericCatalogTable}} and 
{{HiveCatalogTable}}, etc.

Well, we have gone pretty far on FLIP-30 now. When we look back and re-evaluate 
this design, we actually found there are not many differences between, e.g. 
{{GenericCatalogTable}} and {{HiveCatalogTable}}. And having this class 
hierarchy complicates situations and development. On the other side in Blink, 
we don't have this hierarchy and have been just using a single meta-object 
class (e.g. just {{CatalogTable}}) to represent data in different catalogs, it 
has been fine without any problem and all the difference among catalogs can be 
stored as properties.

Thus we propose removing the inheritance hierarchy and impl of the meta-object 
interfaces for each individual catalog. To be more specific, take table classes 
for example, we will replace {{CatalogTable}} with existing 
{{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
{{HiveCatalogTable}}.


> Unify catalog meta-objects implementations and remove their interfaces
> --
>
> Key: FLINK-12841
> URL: https://issues.apache.org/jira/browse/FLINK-12841
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We've been evaluating the original design in FLIP-30 that proposed creating 
> catalog meta object interfaces and individual impl of those interfaces in 
> each catalog. Interfaces we have so far are: {{CatalogDatabase}}, 
> {{CatalogTable}}, {{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, 
> and e.g. for {{CatalogTable}} interface, we have impls of 
> {{GenericCatalogTable}} and {{HiveCatalogTable}}, etc.
> Well, we have gone pretty far on FLIP-30 now. When we look back and 
> re-evaluate this design, we actually found there are not many differences 
> between, e.g. {{GenericCatalogTable}} and {{HiveCatalogTable}}. And having 
> this class hierarchy complicates situations and development. E.g. this 
> requires sql client to have a hard dependency on flink-connector-hive in 
> order to create {{HiveCatalogTable}} from DDL.
> On the other side in Blink, we don't have this hierarchy and have been just 
> using a single meta-object class (e.g. just {{CatalogTable}}) to represent 
> data in different catalogs, it has been fine without any problem and all the 
> difference among catalogs can be stored as properties.
> Thus we propose removing the inheritance hierarchy and impl of the 
> meta-object interfaces for each individual catalog. To be more specific, take 
> table classes for example, we will replace {{CatalogTable}} with existing 
> {{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
> {{HiveCatalogTable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12841) Unify catalog meta-objects implementations and remove their interfaces

2019-06-13 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863541#comment-16863541
 ] 

Bowen Li commented on FLINK-12841:
--

[~twalthr] [~dawidwys] what do you think?

> Unify catalog meta-objects implementations and remove their interfaces
> --
>
> Key: FLINK-12841
> URL: https://issues.apache.org/jira/browse/FLINK-12841
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We've been evaluating the original design in FLIP-30 that proposed creating 
> catalog meta object interfaces and individual impl of those interfaces in 
> each catalog. Interfaces we have so far are: {{CatalogDatabase}}, 
> {{CatalogTable}}, {{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, 
> and e.g. for {{CatalogTable}} interface, we have impls of 
> {{GenericCatalogTable}} and {{HiveCatalogTable}}, etc.
> Well, we have gone pretty far on FLIP-30 now. When we look back and 
> re-evaluate this design, we actually found there are not many differences 
> between, e.g. {{GenericCatalogTable}} and {{HiveCatalogTable}}. And having 
> this class hierarchy complicates situations and development. E.g. this 
> requires sql client to have a hard dependency on flink-connector-hive in 
> order to create {{HiveCatalogTable}} from DDL, which I don't think is 
> necessary.
> On the other side in Blink, we don't have this hierarchy and have been just 
> using a single meta-object class (e.g. just {{CatalogTable}}) to represent 
> data in different catalogs, it has been fine without any problem and all the 
> difference among catalogs can be stored as properties.
> Thus we propose removing the inheritance hierarchy and impl of the 
> meta-object interfaces for each individual catalog. To be more specific, take 
> table classes for example, we will replace {{CatalogTable}} with existing 
> {{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
> {{HiveCatalogTable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-13 Thread GitBox
xuefuz commented on a change in pull request #8703: [FLINK-12807][hive]Support 
Hive table columnstats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293611339
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1069,7 +1075,22 @@ public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics ta
 
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
-
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   // Set table column stats. This only works for 
non-partitioned tables.
+   if (!isTablePartitioned(hiveTable)) {
+   
client.updateTableColumnStatistics(HiveCatalogUtil.createTableColumnStats(hiveTable,
 columnStatistics.getColumnStatisticsData()));
+   } else {
+   throw new 
CatalogException(String.format("Failed to alter partition table column stats of 
table %s",
 
 Review comment:
   Yeah. I think TableNotPartitioned makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12841) Unify catalog meta-objects implementations and remove their interfaces

2019-06-13 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12841:


 Summary: Unify catalog meta-objects implementations and remove 
their interfaces
 Key: FLINK-12841
 URL: https://issues.apache.org/jira/browse/FLINK-12841
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Table SQL / API
Affects Versions: 1.9.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


We've been evaluating the original design in FLIP-30 that proposed creating 
catalog meta object interfaces and individual impl of those interfaces in each 
catalog. Interfaces we have so far are: {{CatalogDatabase}}, {{CatalogTable}}, 
{{CatalogView}}, {{CatalogPartition}}, {{CatalogFunction}}, and e.g. for 
{{CatalogTable}} interface, we have impls of {{GenericCatalogTable}} and 
{{HiveCatalogTable}}, etc.

Well, we have gone pretty far on FLIP-30 now. When we look back and re-evaluate 
this design, we actually found there are not many differences between, e.g. 
{{GenericCatalogTable}} and {{HiveCatalogTable}}. And having this class 
hierarchy complicates situations and development. On the other side in Blink, 
we don't have this hierarchy and have been just using a single meta-object 
class (e.g. just {{CatalogTable}}) to represent data in different catalogs, it 
has been fine without any problem and all the difference among catalogs can be 
stored as properties.

Thus we propose removing the inheritance hierarchy and impl of the meta-object 
interfaces for each individual catalog. To be more specific, take table classes 
for example, we will replace {{CatalogTable}} with existing 
{{AbstractCatalogTable}}, and remove {{GenericCatalogTable}} and 
{{HiveCatalogTable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog

2019-06-13 Thread GitBox
xuefuz commented on a change in pull request #8720: [FLINK-12771][hive] Support 
ConnectorCatalogTable in HiveCatalog
URL: https://github.com/apache/flink/pull/8720#discussion_r293600489
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -332,11 +359,24 @@ public void renameTable(ObjectPath tablePath, String 
newTableName, boolean ignor
checkNotNull(tablePath, "tablePath cannot be null");

checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName 
cannot be null or empty");
 
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+
+   ConnectorCatalogTable connectorTable = 
connectorTables.remove(tablePath);
+
+   if (connectorTable != null) {
+   if (connectorTables.containsKey(newPath)) {
+   throw new TableAlreadyExistException(getName(), 
newPath);
 
 Review comment:
   If we get here because of this exception, the table is already dropped, 
which doesn't seem right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8734: [FLINK-12840] fix network utils to work with ipv6 correctly

2019-06-13 Thread GitBox
flinkbot commented on issue #8734: [FLINK-12840] fix network utils to work with 
ipv6 correctly
URL: https://github.com/apache/flink/pull/8734#issuecomment-501899832
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-13 Thread GitBox
bowenli86 commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293598734
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 ##
 @@ -58,7 +58,7 @@ static void checkEquals(CatalogTableStatistics ts1, 
CatalogTableStatistics ts2)
assertEquals(ts1.getProperties(), ts2.getProperties());
}
 
-   static void checkEquals(CatalogColumnStatistics cs1, 
CatalogColumnStatistics cs2) {
+   public static void checkEquals(CatalogColumnStatistics cs1, 
CatalogColumnStatistics cs2) {
 
 Review comment:
   move this to CatalogTestBase, rather than changing its signature?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-13 Thread GitBox
bowenli86 commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293598734
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 ##
 @@ -58,7 +58,7 @@ static void checkEquals(CatalogTableStatistics ts1, 
CatalogTableStatistics ts2)
assertEquals(ts1.getProperties(), ts2.getProperties());
}
 
-   static void checkEquals(CatalogColumnStatistics cs1, 
CatalogColumnStatistics cs2) {
+   public static void checkEquals(CatalogColumnStatistics cs1, 
CatalogColumnStatistics cs2) {
 
 Review comment:
   move this to CatalogTestBase?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   >