[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289591178
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputSp

[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289590766
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 ##
 @@ -71,4 +80,37 @@ public static TableSchema 
createTableSchema(List cols, List parameters = serDeInfo.getParameters();
+   Properties properties = new Properties();
+   properties.setProperty(
+   serdeConstants.SERIALIZATION_FORMAT,
+   
parameters.get(serdeConstants.SERIALIZATION_FORMAT));
+   List colTypes = new ArrayList<>();
+   List colNames = new ArrayList<>();
+   List cols = storageDescriptor.getCols();
+   for (FieldSchema col: cols){
+   colTypes.add(col.getType());
+   colNames.add(col.getName());
+   }
+   properties.setProperty(serdeConstants.LIST_COLUMNS, 
StringUtils.join(colNames, ","));
+   // Note: serdeConstants.COLUMN_NAME_DELIMITER is not defined in 
previous Hive. We use a literal to save on shim
+   properties.setProperty("column.name.delimite", 
String.valueOf(SerDeUtils.COMMA));
+   properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, 
StringUtils.join(colTypes, DEFAULT_LIST_COLUMN_TYPES_SEPARATOR));
 
 Review comment:
   in line 107, it is a hive inner design. Hive use ':'/','/';' as columnType 
delimiter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12702) support 'properties' in catalog entries in SQL CLI yaml file

2019-05-31 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12702:


 Summary: support 'properties' in catalog entries in SQL CLI yaml 
file
 Key: FLINK-12702
 URL: https://issues.apache.org/jira/browse/FLINK-12702
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


support 'properties' in catalog entries in SQL CLI yaml file, e.g.:

{code:java}
catalogs:
 - name: 
   type: 
   property-version: 
   default-database:  # Optional
   properties:
 # Properties of the catalog 
 : 
 : 
 ...
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8560: [hotfix][hive] make hive-metastore dependency scope to 'provided'

2019-05-31 Thread GitBox
asfgit closed pull request #8560: [hotfix][hive] make hive-metastore dependency 
scope to 'provided'
URL: https://github.com/apache/flink/pull/8560
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12642) OutputBufferPoolUsageGauge can fail with NPE

2019-05-31 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12642.

Resolution: Fixed

master: bc16485cc89fbe5b0dd1534737d0b5cd1ced885b

> OutputBufferPoolUsageGauge can fail with NPE
> 
>
> Key: FLINK-12642
> URL: https://issues.apache.org/jira/browse/FLINK-12642
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The result partition metrics are initialized before 
> {{ResultPartitiion#setup}} was called. If a reporter tries to access a 
> {{OutputBufferPoolUsageGauge}} in between it will fail with an NPE since the 
> bufferpool of the partition is still null.
> {code}
> 2019-05-27 14:49:47,031 WARN  
> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
> reporting metrics
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:41)
>   at 
> org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:27)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.tryReport(Slf4jReporter.java:114)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:80)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
>   at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>   at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>   at java.base/java.lang.Thread.run(Thread.java:844)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #8574: [FLINK-12642][network][metrics] Fix In/OutputBufferPoolUsageGauge failure with NullPointerException if BufferPool has not been inited yet

2019-05-31 Thread GitBox
zentol merged pull request #8574: [FLINK-12642][network][metrics] Fix 
In/OutputBufferPoolUsageGauge failure with NullPointerException if BufferPool 
has not been inited yet
URL: https://github.com/apache/flink/pull/8574
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12564) Remove getBufferProvider method from ResultPartitionWriter interface

2019-05-31 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12564:
-
Fix Version/s: 1.9.0

> Remove getBufferProvider method from ResultPartitionWriter interface
> 
>
> Key: FLINK-12564
> URL: https://issues.apache.org/jira/browse/FLINK-12564
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently `ResultPartitionWriter#getBufferProvider` is used for requesting 
> `BufferBuilder` in `RecordWriter`, then the `BufferConsumer` created from 
> `BufferBuilder` is added into `ResultPartitionWriter` via `addBufferConsumer` 
> method.
> We could merge these two methods in `ResultPartitionWriter` in order not to 
> expose `getBufferProvider`. `ResultPartitionWriter` could internally request 
> `BufferBuilder` and  add the created `BufferConsumer` into one sub partition, 
> then return the `BufferBuilder` for `RecordWriter` writing serialized data.
> Since we also change the `ResultPartitionWriter#addBufferConsumer` to 
> `ResultPartitionWriter#requestBufferBuilder`, then another new method 
> `ResultPartitionWriter#broadcastEvents` should be introduced for handling the 
> case of events.
> In future it might worth further abstracting the `ResultPartitionWriter` to 
> be not only related to  `BufferBuilder`. We could provide `writeRecord(int 
> targetIndex)` to replace `requestBufferBuilder`, then the serialization 
> process could be done inside specific `ResultPartitionWriter` instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289533444
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputS

[jira] [Closed] (FLINK-12564) Remove getBufferProvider method from ResultPartitionWriter interface

2019-05-31 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12564.

Resolution: Fixed

master: c53c446486d58e3db149a9ea6fe1984227e415b2

> Remove getBufferProvider method from ResultPartitionWriter interface
> 
>
> Key: FLINK-12564
> URL: https://issues.apache.org/jira/browse/FLINK-12564
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently `ResultPartitionWriter#getBufferProvider` is used for requesting 
> `BufferBuilder` in `RecordWriter`, then the `BufferConsumer` created from 
> `BufferBuilder` is added into `ResultPartitionWriter` via `addBufferConsumer` 
> method.
> We could merge these two methods in `ResultPartitionWriter` in order not to 
> expose `getBufferProvider`. `ResultPartitionWriter` could internally request 
> `BufferBuilder` and  add the created `BufferConsumer` into one sub partition, 
> then return the `BufferBuilder` for `RecordWriter` writing serialized data.
> Since we also change the `ResultPartitionWriter#addBufferConsumer` to 
> `ResultPartitionWriter#requestBufferBuilder`, then another new method 
> `ResultPartitionWriter#broadcastEvents` should be introduced for handling the 
> case of events.
> In future it might worth further abstracting the `ResultPartitionWriter` to 
> be not only related to  `BufferBuilder`. We could provide `writeRecord(int 
> targetIndex)` to replace `requestBufferBuilder`, then the serialization 
> process could be done inside specific `ResultPartitionWriter` instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289534070
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 ##
 @@ -71,4 +80,37 @@ public static TableSchema 
createTableSchema(List cols, List parameters = serDeInfo.getParameters();
+   Properties properties = new Properties();
+   properties.setProperty(
+   serdeConstants.SERIALIZATION_FORMAT,
+   
parameters.get(serdeConstants.SERIALIZATION_FORMAT));
+   List colTypes = new ArrayList<>();
+   List colNames = new ArrayList<>();
+   List cols = storageDescriptor.getCols();
+   for (FieldSchema col: cols){
+   colTypes.add(col.getType());
+   colNames.add(col.getName());
+   }
+   properties.setProperty(serdeConstants.LIST_COLUMNS, 
StringUtils.join(colNames, ","));
+   // Note: serdeConstants.COLUMN_NAME_DELIMITER is not defined in 
previous Hive. We use a literal to save on shim
+   properties.setProperty("column.name.delimite", 
String.valueOf(SerDeUtils.COMMA));
+   properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, 
StringUtils.join(colTypes, DEFAULT_LIST_COLUMN_TYPES_SEPARATOR));
 
 Review comment:
   why join column names with `,` in line 107 but join column types with `:` 
here? Can we just use comma to be consistent?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289533444
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputS

[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289526514
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/FlinkHiveException.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Exception used by {@link HiveTableInputFormat}.
 
 Review comment:
   shouldn't this be a generic exception used for all parts of flink's hive 
data connector?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289526210
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -349,7 +363,7 @@ under the License.
test

 
-   
+
 
 Review comment:
   What's the change here? revert it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289527087
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+
+/**
+ * Class used to serialize to and from raw hdfs file type.
+ * Highly inspired by HCatRecordSerDe (almost copied from this class)in 
hive-catalog-core.
+ */
+public class HiveRecordSerDe {
+
+   /**
+* Return underlying Java Object from an object-representation
+* that is readable by a provided ObjectInspector.
+*/
+   public static Object obtainFlinkRowField(Object field, ObjectInspector 
fieldObjectInspector) {
+   Object res;
+   if (fieldObjectInspector.getCategory() == 
ObjectInspector.Category.PRIMITIVE) {
+   res = convertPrimitiveField(field, 
(PrimitiveObjectInspector) fieldObjectInspector);
+   } else {
+   throw new FlinkHiveException(new 
SerDeException(HiveRecordSerDe.class.toString()
+   
+ " does not know what to do with fields of unknown category: "
 
 Review comment:
   `String.format("HiveRecordSerDe doesn't support category %s, type %s yet", 
category, typeName)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289534495
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Tests {@link HiveTableInputFormat}.
+ */
+public class HiveInputFormatTest {
+
+   public static final String DEFAULT_SERDE_CLASS = 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
 
 Review comment:
   minor: rename those constants to be `DEFAULT_XXX_CLASS_NAME`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289527558
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ * It's used to read from hive partition/non-partition table.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private RowTypeInfo rowTypeInfo;
+
+   //Necessary info to init deserializer
+   private String[] partitionColNames;
+   //For non-partition hive table, partitions only contains one partition 
which partitionValues is empty.
+   private List partitions;
+   private transient Deserializer deserializer;
+   //Hive StructField list contain all related info for specific serde.
+   private transient List structFields;
+   //StructObjectInspector in hive helps us to look into the internal 
structure of a struct object.
+   private transient StructObjectInspector structObjectInspector;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   RowTypeInfo rowTypeInfo) {
+   super(jobConf.getCredentials());
+   checkArgument(null != rowTypeInfo, "rowTypeInfo can not be 
null.");
 
 Review comment:
   use `c

[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289528106
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ * It's used to read from hive partition/non-partition table.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private RowTypeInfo rowTypeInfo;
+
+   //Necessary info to init deserializer
+   private String[] partitionColNames;
+   //For non-partition hive table, partitions only contains one partition 
which partitionValues is empty.
+   private List partitions;
+   private transient Deserializer deserializer;
+   //Hive StructField list contain all related info for specific serde.
+   private transient List structFields;
+   //StructObjectInspector in hive helps us to look into the internal 
structure of a struct object.
+   private transient StructObjectInspector structObjectInspector;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   RowTypeInfo rowTypeInfo) {
+   super(jobConf.getCredentials());
+   checkArgument(null != rowTypeInfo, "rowTypeInfo can not be 
null.");
+   checkArgumen

[GitHub] [flink] zentol merged pull request #8518: [FLINK-12564][network] Remove getBufferProvider method from ResultPartitionWriter

2019-05-31 Thread GitBox
zentol merged pull request #8518: [FLINK-12564][network] Remove 
getBufferProvider method from ResultPartitionWriter
URL: https://github.com/apache/flink/pull/8518
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10921) Prioritize shard consumers in Kinesis Consumer by event time

2019-05-31 Thread Thomas Weise (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-10921:
-
Fix Version/s: 1.8.1

> Prioritize shard consumers in Kinesis Consumer by event time 
> -
>
> Key: FLINK-10921
> URL: https://issues.apache.org/jira/browse/FLINK-10921
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Shard consumer threads currently emit records directly. In order to align 
> shards by event time, decouple shard consumer threads and emitter with a 
> queue, as described in [1].
> [1] 
> https://lists.apache.org/thread.html/ac41718246ad8f6098efaf7dbf5f7182d60abdc473e8bf3c96ef5968@%3Cdev.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tweise merged pull request #8577: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-31 Thread GitBox
tweise merged pull request #8577: [FLINK-10921] [kinesis] Shard watermark 
synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8577
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8536: [FLINK-12568][hive] Implement OutputFormat to write Hive tables

2019-05-31 Thread GitBox
bowenli86 commented on issue #8536: [FLINK-12568][hive] Implement OutputFormat 
to write Hive tables
URL: https://github.com/apache/flink/pull/8536#issuecomment-497816186
 
 
   LGTM, but why do we need the commit "fix HiveTypeUtil a50ac6e"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-31 Thread GitBox
asfgit closed pull request #8564: [FLINK-12649][hive] Add a shim layer to 
support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-31 Thread GitBox
bowenli86 commented on issue #8564: [FLINK-12649][hive] Add a shim layer to 
support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#issuecomment-497812463
 
 
   FYI, I rebased this PR to master and resolved conflicts before merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12679) Support 'default-database' config for catalog entries in SQL CLI yaml file

2019-05-31 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12679.

Resolution: Fixed

merged in 1.9.0: 8b7577cfafa067ca719aac64695c2b5acc8d56f2

> Support 'default-database' config for catalog entries in SQL CLI yaml file
> --
>
> Key: FLINK-12679
> URL: https://issues.apache.org/jira/browse/FLINK-12679
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289498324
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
 
 Review comment:
   Variables in `HiveTableInputFormat.java` cannot be final due to Java 
Ser/Deserilize mechanism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12676) Add descriptor, validator, and factory for GenericInMemoryCatalog

2019-05-31 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12676.

Resolution: Fixed

merged in 1.9.0: c691c1381fe486068a3beb6bab38f3a29b1cc255

> Add descriptor, validator, and factory for GenericInMemoryCatalog
> -
>
> Key: FLINK-12676
> URL: https://issues.apache.org/jira/browse/FLINK-12676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Command Line Client, Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289498324
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
 
 Review comment:
   Variables in `HiveTableInputFormat.java` cannot be final due to Java 
Ser/Deserialize mechanism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12678) Add AbstractCatalog to manage the common catalog name and default database name for catalogs

2019-05-31 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12678.

Resolution: Fixed

merged in 1.9.0: ba648a57c3bf65efb657095a5c682e2a852d1bdf

> Add AbstractCatalog to manage the common catalog name and default database 
> name for catalogs
> 
>
> Key: FLINK-12678
> URL: https://issues.apache.org/jira/browse/FLINK-12678
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-31 Thread GitBox
asfgit closed pull request #8567: [FLINK-12676][table][sql-client] Add 
descriptor, validator, and factory of GenericInMemoryCatalog for table 
discovery service
URL: https://github.com/apache/flink/pull/8567
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289497240
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputSp

[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289486995
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputSp

[GitHub] [flink] bowenli86 commented on issue #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-31 Thread GitBox
bowenli86 commented on issue #8567: [FLINK-12676][table][sql-client] Add 
descriptor, validator, and factory of GenericInMemoryCatalog for table 
discovery service
URL: https://github.com/apache/flink/pull/8567#issuecomment-497798591
 
 
   Thanks @xuefuz for your review! I renamed the variable.
   
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289484764
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputSp

[GitHub] [flink] bowenli86 commented on issue #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-31 Thread GitBox
bowenli86 commented on issue #8564: [FLINK-12649][hive] Add a shim layer to 
support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#issuecomment-497796188
 
 
   LGTM, merging.
   
   BTW, for supporting multiple hive versions, we may be able to refer to [how 
Flink supports multiple hadoop 
versions](https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html#hadoop-versions)
 and borrow some ideas from there  @xuefuz @lirui-apache 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289470421
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   LogicalSlot targetSlot,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return createDeploymentDescriptor(
+   targetSlot.getTaskManagerLocation().getResourceID(),
+   targetSlot.getAllocationId(),
+   targetSlot.getPhysicalSlo

[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289472727
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   LogicalSlot targetSlot,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return createDeploymentDescriptor(
+   targetSlot.getTaskManagerLocation().getResourceID(),
+   targetSlot.getAllocationId(),
+   targetSlot.getPhysicalSlo

[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289470421
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   LogicalSlot targetSlot,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return createDeploymentDescriptor(
+   targetSlot.getTaskManagerLocation().getResourceID(),
+   targetSlot.getAllocationId(),
+   targetSlot.getPhysicalSlo

[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289470421
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   LogicalSlot targetSlot,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return createDeploymentDescriptor(
+   targetSlot.getTaskManagerLocation().getResourceID(),
+   targetSlot.getAllocationId(),
+   targetSlot.getPhysicalSlo

[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289465539
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
 
 Review comment:
   This method could be replaced by the below `createDeploymentDescriptor` with 
specific parameters, I think it is no need to maintain multiple forms for 
creating TDD.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289464839
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   LogicalSlot targetSlot,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return createDeploymentDescriptor(
+   targetSlot.getTaskManagerLocation().getResourceID(),
+   targetSlot.getAllocationId(),
+   targetSlot.getPhysicalSlo

[GitHub] [flink] 1u0 edited a comment on issue #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)

2019-05-31 Thread GitBox
1u0 edited a comment on issue #8523: [FLINK-12481][runtime] Invoke timer 
callback in task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523#issuecomment-497745492
 
 
   Thanks for review.
   There are some tests failures that are related to this change (I have run 
only `flink-streaming-java` tests locally). In particular, expectations about 
execution model in the tests doesn't match the changed one.
   
   More importantly, I have to update user facing documentation regarding 
timers execution, that would mention such runtime change.
   I'll reopen this PR, after documentation update and if it's aligned.
   
   ~~@StefanRRichter, @pnowojski it's up to you if you want to read this PR 
furhter. On my side, the implementation would not change significantly (modulo 
the discussion with Stefan regarding timer service callback).~~


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 edited a comment on issue #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)

2019-05-31 Thread GitBox
1u0 edited a comment on issue #8523: [FLINK-12481][runtime] Invoke timer 
callback in task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523#issuecomment-497745492
 
 
   Thanks for review.
   There are some tests failures that are related to this change (I have run 
only `flink-streaming-java` tests locally). In particular, expectations about 
execution model in the tests doesn't match the changed one.
   
   More importantly, I have to update user facing documentation regarding 
timers execution, that would mention such runtime change.
   I'll reopen this PR, after documentation update and if it's aligned.
   
   **Update:** the implementation of `AsyncWaitOperator.processElement` also 
has to be changed. In particular, it cannot host execution of 
`userFunction.asyncInvoke` in the main task's thread anymore when there is a 
timeout. I'll raise this concern in the ticket.
   
   ~~@StefanRRichter, @pnowojski it's up to you if you want to read this PR 
furhter. On my side, the implementation would not change significantly (modulo 
the discussion with Stefan regarding timer service callback).~~


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12701) Column name alias causes exception when used with where and group-by

2019-05-31 Thread Josh Bradt (JIRA)
Josh Bradt created FLINK-12701:
--

 Summary: Column name alias causes exception when used with where 
and group-by
 Key: FLINK-12701
 URL: https://issues.apache.org/jira/browse/FLINK-12701
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.8.0
Reporter: Josh Bradt


Assigning a column an alias containing a space sometimes causes an exception 
even though the docs suggest this is valid.

Assume we have a table {{Groups}} that contains a string-typed column called 
{{name}}. Then the query
{code:sql}
SELECT `Groups`.`name` AS `Group Name` FROM `Groups`
{code}
works as expected, but
{code:sql}
SELECT `Groups`.`name` AS `Group Name` 
FROM `Groups` 
WHERE `Groups`.`name` LIKE 'Treat%' 
ORDER BY `Groups`.`name` ASC
{code}
produces the following exception
{code:java}
org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
 Invalid tuple field reference "Group Name".
at 
org.apache.flink.api.java.typeutils.RowTypeInfo.getFlatFields(RowTypeInfo.java:97)
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:266)
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
at org.apache.flink.api.java.DataSet.partitionByRange(DataSet.java:1298)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetSort.translateToPlan(DataSetSort.scala:99)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:311)
at 
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
[...]
{code}
If you remove the {{WHERE}} clause or the {{ORDER BY}} clause, it works fine. 
It only fails when both are included. Additionally, it works fine if the column 
alias ({{AS `Group Name`}}) is removed or if it doesn't contain a space ({{AS 
`GroupName`}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 closed pull request #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)

2019-05-31 Thread GitBox
1u0 closed pull request #8523: [FLINK-12481][runtime] Invoke timer callback in 
task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on issue #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)

2019-05-31 Thread GitBox
1u0 commented on issue #8523: [FLINK-12481][runtime] Invoke timer callback in 
task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523#issuecomment-497745492
 
 
   Thanks for review.
   There are some tests failures that are related to this change (I have run 
only `flink-streaming-java` tests locally). In particular, expectations about 
execution model in the tests doesn't match the changed one.
   
   More importantly, I have to update user facing documentation regarding 
timers execution, that would mention such runtime change.
   I'll reopen this PR, after documentation update and if it's aligned.
   
   @StefanRRichter, @pnowojski it's up to you if you want to read this PR 
furhter. On my side, the implementation would not change significantly (modulo 
the discussion with Stefan regarding timer service callback).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289427441
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -566,6 +619,67 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
}
}
 
+   @VisibleForTesting
+   CompletableFuture 
registerProducedPartitions(TaskManagerLocation location) {
+   assertRunningInJobMasterMainThread();
+
+   return FutureUtils.thenApplyAsyncIfNotDone(
+   registerProducedPartitions(vertex, location, attemptId),
+   
vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
+   producedPartitionsCache -> {
+   producedPartitions = producedPartitionsCache;
+   return this;
+   });
+   }
+
+   @VisibleForTesting
+   static CompletableFuture> registerProducedPartitions(
 
 Review comment:
   why not? it is also easier to test without Execution mocking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-05-31 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853092#comment-16853092
 ] 

Piotr Nowojski commented on FLINK-12070:


Thanks [~kevin.cyj] for the detailed report. We will take a look at this. We 
had the same concerns, but purely theoretical, that what you are describing 
could happen and we were discussing about how worried we should be. Your data 
points will be of high value from this perspective.

 

At least for now, I'm marking this as a 1.9 blocker, until we further 
investigate this.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289424609
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
+   ProducerDescriptor producerDescriptor,
+   int connectionIndex) {
+   return producerDescriptor.getDataPort() >= 0 ?
+   
NettyShuffleDescriptor.RemotePartitionLocation.fromProducerDescriptor(producerDescriptor,
 connectionIndex) :
 
 Review comment:
   I think factory is more readable, the remote connection info is created only 
from the `ProducerDescriptor`. I refactored it a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-05-31 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-12070:
---
Affects Version/s: 1.9.0
 Priority: Blocker  (was: Major)

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-05-31 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-12070:


Reopening this issue because of possible performance regression and stability 
problem.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289424609
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
+   ProducerDescriptor producerDescriptor,
+   int connectionIndex) {
+   return producerDescriptor.getDataPort() >= 0 ?
+   
NettyShuffleDescriptor.RemotePartitionLocation.fromProducerDescriptor(producerDescriptor,
 connectionIndex) :
 
 Review comment:
   I think factory is more readable, the remote connection info is created only 
from the `ProducerDescriptor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289420919
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
 
 Review comment:
   the method does need any enum object fields.
   why not to make it static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289420919
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
 
 Review comment:
   why not to use static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289404322
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OutputConversionTableOperation.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Special, internal kind of {@link ModifyTableOperation} that allows 
converting a tree of
+ * {@link QueryTableOperation}s to a {@link StreamTransformation} of given 
type described with
+ * {@link TypeInformation}. This is used to convert a relational query to a 
datastream.
+ *
+ * @param  expected type of {@link StreamTransformation}
+ */
+@Internal
+public class OutputConversionTableOperation extends ModifyTableOperation {
+
+   private final QueryTableOperation child;
+   private final TypeInformation type;
 
 Review comment:
   Let's start using DataType.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289405865
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 ##
 @@ -79,6 +79,7 @@ class ExternalCatalogSchema(
   override def getTable(name: String): Table = try {
 val externalCatalogTable = catalog.getTable(name)
 ExternalTableUtil.fromExternalCatalogTable(isBatch, externalCatalogTable)
+
 
 Review comment:
   unrelated change


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289406596
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/InlineSinkTableOperation.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.sinks.TableSink;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * DML operation that tells to write to the given sink.
+ */
+@Internal
+public class InlineSinkTableOperation extends ModifyTableOperation {
 
 Review comment:
   Maybe `UnregisteredSinkTableOperation`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289420588
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/Planner.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.operations.ModifyTableOperation;
+import org.apache.flink.table.operations.QueryTableOperation;
+import org.apache.flink.table.operations.TableOperation;
+
+import java.util.List;
+
+/**
+ * This interface serves two purposes:
+ * 
+ * SQL parser - transforms a SQL string into a Table API specific tree of
+ * {@link TableOperation}s
+ * relational planner - provides a way to plan, optimize and transform 
tree of
+ * {@link ModifyTableOperation} into a runnable form ({@link 
StreamTransformation})
+ * .
+ *
+ * The Planner is execution agnostic. It is up to the
+ * {@link org.apache.flink.table.api.TableEnvironment} to ensure that if any 
of the
+ * {@link QueryTableOperation} pull any runtime configuration, all those 
configurations are
+ * equivalent. Example: If some of the {@link QueryTableOperation}s scan 
DataStreams, all
+ * those DataStreams must come from the same StreamExecutionEnvironment, 
because the result
+ * of {@link Planner#translate(List, QueryConfig)} will strip any execution 
configuration from
+ * the DataStream information.
+ *
+ * All Tables referenced in either {@link Planner#parse(String)} or
+ * {@link Planner#translate(List, QueryConfig)} should be previously 
registered in a
+ * {@link org.apache.flink.table.catalog.CatalogManager}, which will be 
provided during
+ * instantiation of the {@link Planner}.
+ */
+@Internal
+public interface Planner {
+
+   /**
+* Entry point for parsing sql queries expressed as a String.
+*
+* Note:If the created {@link TableOperation} is a {@link 
QueryTableOperation}
+* it must be in a form that will be understood by the
+* {@link Planner#translate(List, QueryConfig)} method.
+*
+* The produced Operation tree should already be validated.
+*
+* @param stmt the sql statement to evaluate
+* @return parsed query as a tree of relational {@link TableOperation}s
+*/
+   TableOperation parse(String stmt);
+
+   /**
+* Converts a relational tree of {@link QueryTableOperation}s into a 
set of runnable
+* {@link StreamTransformation}s.
+*
+* This method accepts a list of {@link QueryTableOperation}s to 
allow reusing common
+* subtrees of multiple relational queries.
+*
+* @param tableOperations list of relational operations to plan, 
optimize and convert in a
+* single run.
+* @param queryConfig allows configuring runtime behavior of the 
resulting query
+* @return list of corresponding {@link StreamTransformation}s.
+*/
+   List> translate(List 
tableOperations, QueryConfig queryConfig);
 
 Review comment:
   `QueryConfig` looses its purpose with a unified table environment that might 
merge parts of a table operation tree into a common part. We should merge query 
config into table config. Table config can be considered a mutable session 
configuration that can be updated at any time. It can be passed to the 
constructor of a planner. Thus, we can remove the `QueryConfig` parameter from 
all methods. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12700) Nodejs installation suggestions

2019-05-31 Thread xiezhiqiang (JIRA)
xiezhiqiang created FLINK-12700:
---

 Summary: Nodejs installation suggestions
 Key: FLINK-12700
 URL: https://issues.apache.org/jira/browse/FLINK-12700
 Project: Flink
  Issue Type: Improvement
Reporter: xiezhiqiang


Nodejs has been installed in the system, and it still needs to be downloaded 
when compiling. This is not reasonable. It can be used to detect whether there 
is a nodejs environment in the system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289415869
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 ##
 @@ -54,24 +56,31 @@
 * The index of the consumed subpartition of each consumed partition. 
This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing 
and consuming task.
 */
+   @Nonnegative
private final int consumedSubpartitionIndex;
 
/** An input channel for each consumed subpartition. */
-   private final InputChannelDeploymentDescriptor[] inputChannels;
+   private final ShuffleDescriptor[] inputChannels;
+
+   /**
+* {@link ResourceID} of partition consume to identify its location.
+*
+* It can be used e.g. to compare with partition producer {@link 
ResourceID} in
+* {@link ProducerDescriptor} to determine producer/consumer 
co-location.
+*/
+   private final ResourceID consumerLocation;
 
 Review comment:
   actually I think it is better to rename it in `ProducerDescriptor & 
NettyShuffleDescriptor `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289403456
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryTableOperation.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.List;
+
+/**
+ * Base class for representing an operation structure behind a user-facing 
{@link Table} API.
+ *
+ * It represents an operation that can be a node of a relational query. It 
has a schema, that
+ * can be used to validate a {@link QueryTableOperation} applied on top of 
this one.
+ */
+@Internal
+public interface QueryTableOperation extends TableOperation {
 
 Review comment:
   nit: should we shorten the name to `QueryOperation`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289401441
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ##
 @@ -100,6 +96,9 @@
 @Internal
 public abstract class StreamTransformation {
 
 Review comment:
   @aljoscha what is your opinion about this change? Should we simply move the 
class into the same package or should we deprecate the old location and have a 
new location for streaming related classes in core?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
twalthr commented on a change in pull request #8585:  [FLINK-12690][table-api] 
Introduce a Planner interface 
URL: https://github.com/apache/flink/pull/8585#discussion_r289400549
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ##
 @@ -100,6 +96,9 @@
 @Internal
 public abstract class StreamTransformation {
 
+   // Has to be equal to StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
 
 Review comment:
   Add a comment on the other end, or use this constant in 
`StreamGraphGenerator`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12691) Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime

2019-05-31 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12691:
-
Description: 
A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

-Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.-
 



  was:
A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

-Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.
-
 




> Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime
> --
>
> Key: FLINK-12691
> URL: https://issues.apache.org/jira/browse/FLINK-12691
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Konstantin Knauf
>Priority: Minor
>
> A user that I have recently been working with has the requirement to change 
> the capacity and possibly the timeout during the runtime of a job. 
> -Conceptually, this should be possibly by just passing these parameters to 
> the {{RichAsyncFunction}} via its RuntimeContext. The change of the timeout 
> would only apply to future records and the change in the queue capacity would 
> take effect immediately.-
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12691) Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime

2019-05-31 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12691:
-
Description: 
A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

-Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.
-
 



  was:
A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

~Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.~

 




> Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime
> --
>
> Key: FLINK-12691
> URL: https://issues.apache.org/jira/browse/FLINK-12691
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Konstantin Knauf
>Priority: Minor
>
> A user that I have recently been working with has the requirement to change 
> the capacity and possibly the timeout during the runtime of a job. 
> -Conceptually, this should be possibly by just passing these parameters to 
> the {{RichAsyncFunction}} via its RuntimeContext. The change of the timeout 
> would only apply to future records and the change in the queue capacity would 
> take effect immediately.
> -
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12699) Reduce CPU consumption when snapshot/restore the spilled key-group

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12699:
-

 Summary: Reduce CPU consumption when snapshot/restore the spilled 
key-group
 Key: FLINK-12699
 URL: https://issues.apache.org/jira/browse/FLINK-12699
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


We need to prevent the unnecessary de/serialization when snapshotting/restoring 
the spilled state key-group. To achieve this, we need to:
1. Add meta information for {{HeapKeyedStatebackend}} checkpoint on DFS, 
separating the on-heap and on-disk part
2. Write the off-heap bytes directly to DFS when checkpointing and mark it as 
on-disk
3. Directly write the bytes onto disk when restoring the data back from DFS, if 
it's marked as on-disk

Notice that we cannot directly use file copy since we use mmap meanwhile 
support copy-on-write.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12691) Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime

2019-05-31 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12691:
-
Description: 
A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

~Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.~

 



  was:
A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.

 




> Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime
> --
>
> Key: FLINK-12691
> URL: https://issues.apache.org/jira/browse/FLINK-12691
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Konstantin Knauf
>Priority: Minor
>
> A user that I have recently been working with has the requirement to change 
> the capacity and possibly the timeout during the runtime of a job. 
> ~Conceptually, this should be possibly by just passing these parameters to 
> the {{RichAsyncFunction}} via its RuntimeContext. The change of the timeout 
> would only apply to future records and the change in the queue capacity would 
> take effect immediately.~
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12691) Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime

2019-05-31 Thread Konstantin Knauf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853028#comment-16853028
 ] 

Konstantin Knauf commented on FLINK-12691:
--

[~yanghua] I thought about this again and a) it is not as easy as I thought and 
b) from an API perspective the {{RuntimeContext}} would probably also not be 
nice, because it generally does not contain operator specific methods. 
Optimally, we would have a dedicated {{Context}}, which was passed to 
{{asyncInvoke}}, but this would break the API. Any ideas? 



> Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime
> --
>
> Key: FLINK-12691
> URL: https://issues.apache.org/jira/browse/FLINK-12691
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Konstantin Knauf
>Priority: Minor
>
> A user that I have recently been working with has the requirement to change 
> the capacity and possibly the timeout during the runtime of a job. 
> Conceptually, this should be possibly by just passing these parameters to the 
> {{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
> only apply to future records and the change in the queue capacity would take 
> effect immediately.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12626) Client should not register table-source-sink twice in TableEnvironment

2019-05-31 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853025#comment-16853025
 ] 

Timo Walther commented on FLINK-12626:
--

[~dawidwys] is currently working on decomposition the table environment into 
multiple entities. I think this should also be covered as part of the efforts, 
right?

> Client should not register table-source-sink twice in TableEnvironment
> --
>
> Key: FLINK-12626
> URL: https://issues.apache.org/jira/browse/FLINK-12626
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently for a table specified in SQL CLI yaml file, if it's with type: 
> source-sink-table, it will be registered twice (as source and as sink) to 
> TableEnv.
> As we've moved table management in TableEnv to Catalogs which doesn't allow 
> registering dup named table, we need to come up with a solution to fix this 
> problem.
> cc [~xuefuz] [~tiwalter] [~dawidwys]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12698) Implement a SpillLoadManager

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12698:
-

 Summary: Implement a SpillLoadManager
 Key: FLINK-12698
 URL: https://issues.apache.org/jira/browse/FLINK-12698
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


The {{SpillLoadManager}} is responsible to:
1. Do spill/load when {{HeapStatusMonitor}} triggers.
2. Decide which KeyGroup to spill/load according to data from the accounting 
managers (mainly two factors: state-size and request-rate).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12697) Support on-disk state storage for HeapKeyedStateBackend

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12697:
-

 Summary: Support on-disk state storage for HeapKeyedStateBackend
 Key: FLINK-12697
 URL: https://issues.apache.org/jira/browse/FLINK-12697
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


We need to store state on disk after spilling, and to support this, we need:
1. A data structure to efficiently support copy-on-write for asynchronous 
snapshot
2. An allocator to allocate and manage chunks from the memory mapped file(s)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12696) Implement a MmapManager

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12696:
-

 Summary: Implement a MmapManager
 Key: FLINK-12696
 URL: https://issues.apache.org/jira/browse/FLINK-12696
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


Since we plan to use mmap to accelerate the state read/write against the 
spilled (on-disk) KeyGroup, we need a {{MmapManager}} to manage the mmap-ed 
files, pretty much like the [MemoryMappedFileManager|https://s.apache.org/85BP] 
in log4j2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12695) Implement a HeapStatusMonitor

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12695:
-

 Summary: Implement a HeapStatusMonitor
 Key: FLINK-12695
 URL: https://issues.apache.org/jira/browse/FLINK-12695
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


We need a {{HeapStatusMonitor}} to monitor the JVM heap status and do necessary 
spill/load when heap is exhausted/regained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-31 Thread GitBox
xuefuz commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289385113
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Abstract class for catalogs.
+ */
+@PublicEvolving
+public abstract class AbstractCatalog implements Catalog {
+   private final String catalogName;
 
 Review comment:
   Nit: catalogName -> name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12694) Implement a HeapAccountingManager

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12694:
-

 Summary: Implement a HeapAccountingManager
 Key: FLINK-12694
 URL: https://issues.apache.org/jira/browse/FLINK-12694
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


We need a {{HeapAccountingManager}} for a) (roughly) estimating the on-heap 
size of each key-value; and b) recording the size of each on-heap and off-heap 
KeyGroup. With such accountings we could decide which KeyGroup to spill/load.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8496: [FLINK-12571][network] Make NetworkEnvironment#start() return the binded data port

2019-05-31 Thread GitBox
asfgit closed pull request #8496: [FLINK-12571][network] Make 
NetworkEnvironment#start() return the binded data port
URL: https://github.com/apache/flink/pull/8496
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port

2019-05-31 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-12571.

Resolution: Fixed

1.9: bfd53e9d3f5a221bca8ca82e5f2ab5399d3fa0fd

> Make NetworkEnvironment#start() return the binded data port
> ---
>
> Key: FLINK-12571
> URL: https://issues.apache.org/jira/browse/FLINK-12571
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently `NetworkEnvironment#getConnectionManager()` is mainly used for 
> `TaskManagerServices` for getting binded data port from 
> `NettyConnectionManager`. Considering the `ConnectionManager` as an internal 
> component of `NetworkEnvironment`, it should not be exposed for outsides. For 
> other ShuffleService implementations, it might have no `ConnectionManager` at 
> all.
> We could make `ShuffleService#start()` return the binded data port to replace 
> the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle 
> service implementations which have no binded data port, it could return a 
> simple default value and it would have no harm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12693) Store state per key-group in CopyOnWriteStateTable

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12693:
-

 Summary: Store state per key-group in CopyOnWriteStateTable
 Key: FLINK-12693
 URL: https://issues.apache.org/jira/browse/FLINK-12693
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


Since we propose to use KeyGroup as the unit of spilling/loading, the first 
step is to store state per key-groups. Currently {{NestedMapsStateTable}} 
natively supports this, so we only need to refine {{CopyOnWriteStateTable}}

The main efforts required here is to extract the customized hash-map out of 
{{CopyOnWriteStateTable}} then use such a hash-map as the state holder for each 
KeyGroup. Whereafter we could extract some common logic out into {{StateTable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port

2019-05-31 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-12571:
-
Fix Version/s: 1.9.0

> Make NetworkEnvironment#start() return the binded data port
> ---
>
> Key: FLINK-12571
> URL: https://issues.apache.org/jira/browse/FLINK-12571
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently `NetworkEnvironment#getConnectionManager()` is mainly used for 
> `TaskManagerServices` for getting binded data port from 
> `NettyConnectionManager`. Considering the `ConnectionManager` as an internal 
> component of `NetworkEnvironment`, it should not be exposed for outsides. For 
> other ShuffleService implementations, it might have no `ConnectionManager` at 
> all.
> We could make `ShuffleService#start()` return the binded data port to replace 
> the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle 
> service implementations which have no binded data port, it could return a 
> simple default value and it would have no harm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on issue #8496: [FLINK-12571][network] Make NetworkEnvironment#start() return the binded data port

2019-05-31 Thread GitBox
GJL commented on issue #8496: [FLINK-12571][network] Make 
NetworkEnvironment#start() return the binded data port
URL: https://github.com/apache/flink/pull/8496#issuecomment-497698010
 
 
   Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12692) Support disk spilling in HeapKeyedStateBackend

2019-05-31 Thread Yu Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-12692:
--
Fix Version/s: 1.9.0

We target at completing this work before the 1.9.0 release.

> Support disk spilling in HeapKeyedStateBackend
> --
>
> Key: FLINK-12692
> URL: https://issues.apache.org/jira/browse/FLINK-12692
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Major
> Fix For: 1.9.0
>
>
> {{HeapKeyedStateBackend}} is one of the two {{KeyedStateBackends}} in Flink, 
> since state lives as Java objects on the heap and the de/serialization only 
> happens during state snapshot and restore, it outperforms 
> {{RocksDBKeyedStateBackend}} when all data could reside in memory.
> However, along with the advantage, {{HeapKeyedStateBackend}} also has its 
> shortcomings, and the most painful one is the difficulty to estimate the 
> maximum heap size (Xmx) to set, and we will suffer from GC impact once the 
> heap memory is not enough to hold all state data. There’re several 
> (inevitable) causes for such scenario, including (but not limited to):
> * Memory overhead of Java object representation (tens of times of the 
> serialized data size).
> * Data flood caused by burst traffic.
> * Data accumulation caused by source malfunction.
> To resolve this problem, we propose a solution to support spilling state data 
> to disk before heap memory is exhausted. We will monitor the heap usage and 
> choose the coldest data to spill, and reload them when heap memory is 
> regained after data removing or TTL expiration, automatically.
> More details please refer to the design doc and mailing list discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12692) Support disk spilling in HeapKeyedStateBackend

2019-05-31 Thread Yu Li (JIRA)
Yu Li created FLINK-12692:
-

 Summary: Support disk spilling in HeapKeyedStateBackend
 Key: FLINK-12692
 URL: https://issues.apache.org/jira/browse/FLINK-12692
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


{{HeapKeyedStateBackend}} is one of the two {{KeyedStateBackends}} in Flink, 
since state lives as Java objects on the heap and the de/serialization only 
happens during state snapshot and restore, it outperforms 
{{RocksDBKeyedStateBackend}} when all data could reside in memory.

However, along with the advantage, {{HeapKeyedStateBackend}} also has its 
shortcomings, and the most painful one is the difficulty to estimate the 
maximum heap size (Xmx) to set, and we will suffer from GC impact once the heap 
memory is not enough to hold all state data. There’re several (inevitable) 
causes for such scenario, including (but not limited to):
* Memory overhead of Java object representation (tens of times of the 
serialized data size).
* Data flood caused by burst traffic.
* Data accumulation caused by source malfunction.

To resolve this problem, we propose a solution to support spilling state data 
to disk before heap memory is exhausted. We will monitor the heap usage and 
choose the coldest data to spill, and reload them when heap memory is regained 
after data removing or TTL expiration, automatically.

More details please refer to the design doc and mailing list discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make the DataStream & DataSet conversion to a Table independent from Calcite

2019-05-31 Thread GitBox
dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make 
the DataStream & DataSet conversion to a Table independent from Calcite
URL: https://github.com/apache/flink/pull/8521#discussion_r289368426
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
 ##
 @@ -63,10 +73,11 @@ class DataSetScan(
   tableEnv: BatchTableEnvImpl,
   queryConfig: BatchQueryConfig): DataSet[Row] = {
 val schema = new RowSchema(rowRelDataType)
-val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-val fieldIdxs = dataSetTable.fieldIndexes
 val config = tableEnv.getConfig
-convertToInternalRow(schema, inputDataSet, fieldIdxs, config, None)
+convertToInternalRow(schema, inputDataSet.asInstanceOf[DataSet[Any]], 
fieldIdxs, config, None)
 
 Review comment:
   I didn't change the `convertToInternalRow` and it expects `DataSet[Any]`. I 
can change it for `DataSetScan`, I wouldn't change it for `DataStreamScan` as 
there are much more internal classes involved that expect `DataStream[Any]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make the DataStream & DataSet conversion to a Table independent from Calcite

2019-05-31 Thread GitBox
dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make 
the DataStream & DataSet conversion to a Table independent from Calcite
URL: https://github.com/apache/flink/pull/8521#discussion_r289367916
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/TableOperationCatalogView.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.operations.TableOperation;
+
+import java.util.HashMap;
+import java.util.Optional;
+
+/**
+ * A view created from {@link TableOperation} via operations on {@link 
org.apache.flink.table.api.Table}.
+ */
+public class TableOperationCatalogView extends AbstractCatalogView {
+   private final TableOperation tableOperation;
+
+   public TableOperationCatalogView(TableOperation tableOperation) {
+   this(tableOperation, "This is a catalog view backed by 
TableOperation");
 
 Review comment:
   There is no user-facing API that would allow for that. This `TableOperation` 
is constructed when calling `TableEnvironment#registerTable(String name, Table 
table)`.
   
   I can make the second ctor of `TableOperationCatalogView` public, if that 
helps.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make the DataStream & DataSet conversion to a Table independent from Calcite

2019-05-31 Thread GitBox
dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make 
the DataStream & DataSet conversion to a Table independent from Calcite
URL: https://github.com/apache/flink/pull/8521#discussion_r289366981
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
 ##
 @@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.schema
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.Statistic
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableException, Types}
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-
-abstract class InlineTable[T](
-val typeInfo: TypeInformation[T],
-val fieldIndexes: Array[Int],
-val fieldNames: Array[String],
-val statistic: FlinkStatistic)
 
 Review comment:
   We never actually used that for `DataStream/DataSet` tables. They were 
always set to either `FlinkStatistics.of(new TableStats(1000))` for `DataSet` & 
`FlinkStatistics.UNKNOWN` for `DataStream`.
   
   We can always add them to the `TableOperation`, but I don't see a point for 
that until we have a proper support for them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make the DataStream & DataSet conversion to a Table independent from Calcite

2019-05-31 Thread GitBox
dawidwys commented on a change in pull request #8521: [FLINK-12601][table] Make 
the DataStream & DataSet conversion to a Table independent from Calcite
URL: https://github.com/apache/flink/pull/8521#discussion_r289364923
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
 ##
 @@ -36,15 +39,20 @@ import org.apache.flink.types.Row
 class DataSetScan(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-table: RelOptTable,
+catalog: RelOptSchema,
+inputDataSet: DataSet[_],
+fieldIdxs: Array[Int],
 rowRelDataType: RelDataType)
-  extends TableScan(cluster, traitSet, table)
+  extends TableScan(
+cluster,
+traitSet,
+RelOptTableImpl.create(catalog, rowRelDataType, List[String]().asJava, 
null))
   with BatchScan {
 
-  val dataSetTable: DataSetTable[Any] = 
getTable.unwrap(classOf[DataSetTable[Any]])
-
   override def deriveRowType(): RelDataType = rowRelDataType
 
+  override def estimateRowCount(mq: RelMetadataQuery): Double = 1000L
 
 Review comment:
   We had it hardcoded in the ctor of `DataSetTable`. (In theory you could 
provide it in the ctor of the  `DataSetTable`, but in fact it was always set to 
1000).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12691) Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime

2019-05-31 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852951#comment-16852951
 ] 

vinoyang commented on FLINK-12691:
--

How do you plan to pass the capacity and timeout information to the 
{{RuntimeContext}} (then let it pass these parameters to the 
{{RichAsyncFunction as you side}})?

> Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime
> --
>
> Key: FLINK-12691
> URL: https://issues.apache.org/jira/browse/FLINK-12691
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Konstantin Knauf
>Priority: Minor
>
> A user that I have recently been working with has the requirement to change 
> the capacity and possibly the timeout during the runtime of a job. 
> Conceptually, this should be possibly by just passing these parameters to the 
> {{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
> only apply to future records and the change in the queue capacity would take 
> effect immediately.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289357933
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfigOptions;
+import org.apache.flink.table.api.TableImpl;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.runtime.utils.TableUtil;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import scala.collection.Seq;
+
+/**
+ * Tests {@link HiveTableSource}.
+ */
+public class HiveTableSourceTest {
+
+   public static final String DEFAULT_SERDE_CLASS = 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
+   public static final String DEFAULT_INPUT_FORMAT_CLASS = 
org.apache.hadoop.mapred.TextInputFormat.class.getName();
+   public static final String DEFAULT_OUTPUT_FORMAT_CLASS = 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
+
+   private static HiveCatalog hiveCatalog;
+   private static HiveConf hiveConf;
+
+   @BeforeClass
+   public static void createCatalog() throws IOException {
+   hiveConf = HiveTestUtils.getHiveConf();
+   hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+   hiveCatalog.open();
+   }
+
+   @AfterClass
+   public static void closeCatalog() {
+   if (null != hiveCatalog) {
+   hiveCatalog.close();
+   }
+   }
+
+   @Test
+   public void testReadNonPartitionedTable() throws Exception {
+   final String dbName = "default";
+   final String tblName = "test";
+   TableSchema tableSchema = new TableSchema(
+   new String[]{"a", "b", "c", "d", "e"},
+   new TypeInformation[]{
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.LONG_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO}
+   );
+   //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
+   //serDe temporarily.
+   IMetaStoreClient client = 
RetryingMetaStoreClient.getProxy(hiveConf, null, null, 
HiveMetaStoreClient.class.getName(), true);
+   org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
+   tbl.setDbName(dbName);
+   tbl.setTableName(tblName);
+   tbl.setCreateTime

[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289357105
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputSp

[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289357105
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 ##
 @@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * The HiveTableInputFormat are inspired by the HCatInputFormat and 
HadoopInputFormatBase.
+ */
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase
+   implements ResultTypeQueryable {
+   private static final long serialVersionUID = 6351448428766433164L;
+   private static Logger logger = 
LoggerFactory.getLogger(HiveTableInputFormat.class);
+
+   private JobConf jobConf;
+
+   protected transient Writable key;
+   protected transient Writable value;
+
+   private transient RecordReader recordReader;
+   protected transient boolean fetched = false;
+   protected transient boolean hasNext;
+
+   private Boolean isPartitioned;
+   private BaseRowTypeInfo baseRowTypeInfo;
+
+   // Necessary info to init deserializer
+   private String[] partitionColNames;
+   private List partitions;
+   private transient Deserializer deserializer;
+   private transient List fieldRefs;
+   private transient StructObjectInspector oi;
+   private transient InputFormat mapredInputFormat;
+   private transient HiveTablePartition hiveTablePartition;
+
+   public HiveTableInputFormat(
+   JobConf jobConf,
+   Boolean isPartitioned,
+   String[] partitionColNames,
+   List partitions,
+   BaseRowTypeInfo baseRowTypeInfo) {
+   super(jobConf.getCredentials());
+   this.baseRowTypeInfo = baseRowTypeInfo;
+   this.jobConf = new JobConf(jobConf);
+   this.jobConf = jobConf;
+   this.isPartitioned = isPartitioned;
+   this.partitionColNames = partitionColNames;
+   this.partitions = partitions;
+   }
+
+   @Override
+   public void open(HiveTableInputSp

[GitHub] [flink] zjuwangg commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-31 Thread GitBox
zjuwangg commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289356736
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
 
 Review comment:
   Now it's only used in the batch environment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-31 Thread GitBox
lirui-apache commented on issue #8564: [FLINK-12649][hive] Add a shim layer to 
support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#issuecomment-497673979
 
 
   Thanks @bowenli86 for the review. PR updated accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12691) Make Queue Capacity and Timeout of AsyncWaitOperator changeable during runtime

2019-05-31 Thread Konstantin Knauf (JIRA)
Konstantin Knauf created FLINK-12691:


 Summary: Make Queue Capacity and Timeout of AsyncWaitOperator 
changeable during runtime
 Key: FLINK-12691
 URL: https://issues.apache.org/jira/browse/FLINK-12691
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Konstantin Knauf


A user that I have recently been working with has the requirement to change the 
capacity and possibly the timeout during the runtime of a job. 

Conceptually, this should be possibly by just passing these parameters to the 
{{RichAsyncFunction}} via its RuntimeContext. The change of the timeout would 
only apply to future records and the change in the queue capacity would take 
effect immediately.

 





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement OutputFormat to write Hive tables

2019-05-31 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r289349514
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase 
implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 5167529504848109023L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private transient RowTypeInfo rowTypeInfo;
+   private transient HiveTablePartition hiv

[jira] [Updated] (FLINK-12671) Summarizer: summary statistics for Table

2019-05-31 Thread Xu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Yang updated FLINK-12671:

External issue URL: https://github.com/apache/flink/pull/8586

> Summarizer: summary statistics for Table
> 
>
> Key: FLINK-12671
> URL: https://issues.apache.org/jira/browse/FLINK-12671
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>
> We provide summary statistics for Table through Summarizer. User can easily 
> get the total count and the basic column-wise metrics: max, min, mean, 
> variance, standardDeviation, normL1, normL2, the number of missing values and 
> the number of valid values.
> SparkML has same function, 
> [http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]
>  
>  
> Example:
>  
> String[] colNames = new String[] \{"id", "height", "weight"};
> Row[] data = new Row[]{
>     Row.of(1, 168, 48.1),
>     Row.of(2, 165, 45.8),    
>     Row.of(3, 160, 45.3),
>     Row.of(4, 163, 41.9),
>     Row.of(5, 149, 40.5),
> };
> Table input = new MemSourceBatchOp(data, colNames).getTable();
> TableSummary summary = new Summarizer(input).collectResult();
> System.out.println(summary.mean("height")); // print the mean of the 
> column(Name: “age”)
> System.out.println(summary);
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wisgood commented on a change in pull request #8571: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-05-31 Thread GitBox
wisgood commented on a change in pull request #8571: [FLINK-12682][connectors] 
StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8571#discussion_r289337745
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +87,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   outputStream.write(rowDelimiter.getBytes(charset));
 
 Review comment:
test has been passed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12671) Summarizer: summary statistics for Table

2019-05-31 Thread Xu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Yang updated FLINK-12671:

Description: 
We provide summary statistics for Table through Summarizer. User can easily get 
the total count and the basic column-wise metrics: max, min, mean, variance, 
standardDeviation, normL1, normL2, the number of missing values and the number 
of valid values.

SparkML has same function, 
[http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]

 

 

Example:

 

String[] colNames = new String[] \{"id", "height", "weight"};

Row[] data = new Row[]{

    Row.of(1, 168, 48.1),

    Row.of(2, 165, 45.8),    

    Row.of(3, 160, 45.3),

    Row.of(4, 163, 41.9),

    Row.of(5, 149, 40.5),

};

Table input = new MemSourceBatchOp(data, colNames).getTable();

TableSummary summary = new Summarizer(input).collectResult();

System.out.println(summary.mean("height")); // print the mean of the 
column(Name: “age”)

System.out.println(summary);

 

 

  was:
We provide summary statistics for Table through Summarizer. User can easily get 
the total count and the basic column-wise metrics: max, min, mean, variance, 
standardDeviation, normL1, normL2, the number of missing values and the number 
of valid values.

SparkML has same function, 
[http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]

 

 

Example:

 

String[] colNames = new String[] \{"id", "height", "weight"};

Row[] data = new Row[] {
 Row.of(1, 168, 48.1),
 Row.of(2, 165, 45.8),
 Row.of(3, 160, 45.3),
 Row.of(4, 163, 41.9),
 Row.of(5, 149, 40.5),
};

Table input = new MemSourceBatchOp(data, colNames).getTable();

TableSummary summary = new Summarizer(input).collectResult();

System.out.println(summary.mean("height")); // print the mean of the 
column(Name: “age”)

System.out.println(summary);

 

 


> Summarizer: summary statistics for Table
> 
>
> Key: FLINK-12671
> URL: https://issues.apache.org/jira/browse/FLINK-12671
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>
> We provide summary statistics for Table through Summarizer. User can easily 
> get the total count and the basic column-wise metrics: max, min, mean, 
> variance, standardDeviation, normL1, normL2, the number of missing values and 
> the number of valid values.
> SparkML has same function, 
> [http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]
>  
>  
> Example:
>  
> String[] colNames = new String[] \{"id", "height", "weight"};
> Row[] data = new Row[]{
>     Row.of(1, 168, 48.1),
>     Row.of(2, 165, 45.8),    
>     Row.of(3, 160, 45.3),
>     Row.of(4, 163, 41.9),
>     Row.of(5, 149, 40.5),
> };
> Table input = new MemSourceBatchOp(data, colNames).getTable();
> TableSummary summary = new Summarizer(input).collectResult();
> System.out.println(summary.mean("height")); // print the mean of the 
> column(Name: “age”)
> System.out.println(summary);
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12671) Summarizer: summary statistics for Table

2019-05-31 Thread Xu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Yang updated FLINK-12671:

Description: 
We provide summary statistics for Table through Summarizer. User can easily get 
the total count and the basic column-wise metrics: max, min, mean, variance, 
standardDeviation, normL1, normL2, the number of missing values and the number 
of valid values.

SparkML has same function, 
[http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]

 

 

Example:

 

String[] colNames = new String[] \{"id", "height", "weight"};

Row[] data = new Row[] {
 Row.of(1, 168, 48.1),
 Row.of(2, 165, 45.8),
 Row.of(3, 160, 45.3),
 Row.of(4, 163, 41.9),
 Row.of(5, 149, 40.5),
};

Table input = new MemSourceBatchOp(data, colNames).getTable();

TableSummary summary = new Summarizer(input).collectResult();

System.out.println(summary.mean("height")); // print the mean of the 
column(Name: “age”)

System.out.println(summary);

 

 

  was:
We provide summary statistics for Table through Summarizer. User can easily get 
the total count and the basic column-wise metrics: max, min, mean, variance, 
standardDeviation, normL1, normL2, the number of missing values and the number 
of valid values.

SparkML has same function, 
[http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]

 

 

Example:

 

Table input = … 

TableSummary summary = *new* Summarizer(_input_).collectResult();

System.*_out_*.println(summary.mean(*"age"*));  // print the mean of the 
column(Name: “age”)

System.out.println(summary);

 

 


> Summarizer: summary statistics for Table
> 
>
> Key: FLINK-12671
> URL: https://issues.apache.org/jira/browse/FLINK-12671
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>
> We provide summary statistics for Table through Summarizer. User can easily 
> get the total count and the basic column-wise metrics: max, min, mean, 
> variance, standardDeviation, normL1, normL2, the number of missing values and 
> the number of valid values.
> SparkML has same function, 
> [http://spark.apache.org/docs/latest/ml-statistics.html#summarizer]
>  
>  
> Example:
>  
> String[] colNames = new String[] \{"id", "height", "weight"};
> Row[] data = new Row[] {
>  Row.of(1, 168, 48.1),
>  Row.of(2, 165, 45.8),
>  Row.of(3, 160, 45.3),
>  Row.of(4, 163, 41.9),
>  Row.of(5, 149, 40.5),
> };
> Table input = new MemSourceBatchOp(data, colNames).getTable();
> TableSummary summary = new Summarizer(input).collectResult();
> System.out.println(summary.mean("height")); // print the mean of the 
> column(Name: “age”)
> System.out.println(summary);
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8586: Summarizer

2019-05-31 Thread GitBox
flinkbot commented on issue #8586: Summarizer
URL: https://github.com/apache/flink/pull/8586#issuecomment-497660035
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 opened a new pull request #8586: Summarizer

2019-05-31 Thread GitBox
xuyang1706 opened a new pull request #8586: Summarizer
URL: https://github.com/apache/flink/pull/8586
 
 
   
   
   ## What is the purpose of the change
   
   *Provide a new algorithm Summarizer(summary statistics for Table). User can 
easily get the total count and the basic column-wise metrics: max, min, mean, 
variance, standardDeviation, normL1, normL2, the number of missing values and 
the number of valid values.*
   
   
   ## Brief change log
   
 - *Implementation of algorithm Summarizer.*
 - *Base definition for the algorithm params.*
 - *Definition and operations for sparse/dense matrix and vector.*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(no)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(no)*
   
   This change added tests and can be verified as follows:
   
 - *Added tests for the algorithm Summarizer*
 - *Added tests for DenseVector and SparseVector*
 - *Added tests for DenseMatrix*
 - *Added tests for summary computation process*
 - *Added tests for SummarizerBatchOp*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 * JavaDoc


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8585: [FLINK-12690][table-api] Introduce a Planner interface

2019-05-31 Thread GitBox
flinkbot commented on issue #8585:  [FLINK-12690][table-api] Introduce a 
Planner interface 
URL: https://github.com/apache/flink/pull/8585#issuecomment-497658455
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >