[GitHub] [flink] zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the 
constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#issuecomment-479745830
 
 
   @azagrebin , thanks for review again. :)
   I submitted the commit for rebasing master and addressing the above 
comments. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272013899
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* 

[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-03 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809495#comment-16809495
 ] 

Liya Fan commented on FLINK-3273:
-

Hi [~mxm], would you please help explain what you mean by "add a Scala suffix"? 
Thanks.

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12102) FlinkILoopTest fails on Java 9

2019-04-03 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809493#comment-16809493
 ] 

Liya Fan commented on FLINK-12102:
--

Does Java 11 also have this problem?

> FlinkILoopTest fails on Java 9
> --
>
> Key: FLINK-12102
> URL: https://issues.apache.org/jira/browse/FLINK-12102
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala Shell
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.9.0
>
>
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.FlinkILoopTest.testConfigurationForwarding(FlinkILoopTest.java:89)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272012531
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   Yes, it might be. Then we further focus on it future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10725) Support for Java 11 (LTS)

2019-04-03 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809487#comment-16809487
 ] 

Liya Fan commented on FLINK-10725:
--

Is anyone looking at this issue?

> Support for Java 11 (LTS)
> -
>
> Key: FLINK-10725
> URL: https://issues.apache.org/jira/browse/FLINK-10725
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.8.0, 2.0.0
>Reporter: Sina Madani
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Java 8 is over 5 years old and will be end of life in 2019/2020. Java 11, the 
> latest long-term support release, became GA in September 2018. Given that 
> FLINK-8033 still hasn't been resolved and that Java 9 was end of life 
> (discontinued / no longer publically available or supported) since March 
> 2018, it doesn't make sense to continue trying to add Java 9 support when 
> both Java 9 and Java 10 are end-of-life.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272008282
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MathUtils;
+
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
+
+/**
+ * Utility class to extract related parameters from {@link Configuration} and 
to
+ * sanity check them.
+ */
+public class ConfigurationParserUtils {
+
+   /**
+* Parses the configuration to get the managed memory size and 
validates the value.
+*
+* @param configuration configuration object
+* @return managed memory size (in megabytes)
+*/
+   public static long getManagedMemorySize(Configuration configuration) {
+   long managedMemorySize;
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   try {
+   managedMemorySize = MemorySize.parse(
+   
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), 
MEGA_BYTES).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+   throw new IllegalConfigurationException("Could 
not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+   }
+   } else {
+   managedMemorySize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   }
+
+   checkConfigParameter(configuration.getString(
+   
TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
 || managedMemorySize > 0,
+   managedMemorySize, 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+   "MemoryManager needs at least one MB of memory. " +
+   "If you leave this config parameter empty, the 
system automatically pick a fraction of the available memory.");
+
+   return managedMemorySize;
+   }
+
+   /**
+* Parses the configuration to get the fraction of managed memory and 
validates the value.
+*
+* @param configuration configuration object
+* @return fraction of managed memory
+*/
+   public static float getManagedMemoryFraction(Configuration 
configuration) {
+   float managedMemoryFraction = 
configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+
+   checkConfigParameter(managedMemoryFraction > 0.0f && 
managedMemoryFraction < 1.0f, managedMemoryFraction,
+   TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+   "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
+
+   return managedMemoryFraction;
+   }
+
+   /**
+* Parses the configuration to get the type of memory.
+*
+* @param configuration configuration object
+* @return type of memory
+*/
+   public static MemoryType getMemoryType(Configuration configuration) {
+   // check whether we use heap or off-heap memory
+   final MemoryType memType;
+   if 
(configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+   memType = MemoryType.OFF_HEAP;
+   } else {
+   

[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272008044
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ##
 @@ -85,4 +85,9 @@ protected MemorySegment nextSegment(MemorySegment current) 
throws EOFException {
protected int getLimitForSegment(MemorySegment segment) {
return this.currentSegmentIndex == this.segments.size() - 1 ? 
this.limitInLastSegment : this.segmentSize;
}
+
+   public void updateLimitInLastSegment(int value) {
+   this.limitInLastSegment = value;
+   updateCurrentSegmentLimit();
 
 Review comment:
   Merge malfunction, I will delete it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2019-04-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10195:
---
Labels: pull-request-available  (was: )

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8111: [FLINK-10195] [RabbitMQ] - RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2019-04-03 Thread GitBox
flinkbot commented on issue #8111: [FLINK-10195] [RabbitMQ] - RabbitMQ Source 
With Checkpointing Doesn't Backpressure Correctly
URL: https://github.com/apache/flink/pull/8111#issuecomment-479738490
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ljurukov opened a new pull request #8111: [FLINK-10195] [RabbitMQ] - RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2019-04-03 Thread GitBox
ljurukov opened a new pull request #8111: [FLINK-10195] [RabbitMQ] - RabbitMQ 
Source With Checkpointing Doesn't Backpressure Correctly
URL: https://github.com/apache/flink/pull/8111
 
 
   ## What Does This Change 
   Currently RabbitMQ Connection doesn't backpressure when setting no limit to 
unacked messages.  Data is stored in an in memory buffer until the heap pops. 
This resolves this issue by allowing the user to specify a buffer size, 
limiting the number of data points stored in memory. by default the behavior 
will be the same.
   
   ## Brief change log
 - Added optional configuration parameter for specifying buffer size
 - Created mechanism for turning on and off the connection as the buffer 
gets too full or empty
   
   
   ## Verifying this change
   
 - Manually verified the change by running a RabbitMQ instance with 18 
million records (~37 GB) of data and and a task manager with a couple of GB or 
RAM writing slowly to a Sink
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes) - Updated 
RabbitMQ to use latest
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272007149
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
 
 Review comment:
   
https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html
   window frame is a concept of standard SQL, I'll add some comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272006302
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   It is actually for the consideration of tests. 
   We have the `NetworkConfiguraitonBuilder` which is easy for setting 
`isCreditBased` variable in tests, otherwise it needs to construct the 
`NettyConfig` with internal `Configuration` to set this variable. I ever 
considered removing the corresponding `getter` from `NettyConfig` to avoid 
duplication.


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272005278
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
+
+   //return the ACC of the window frame.
+   BaseRow write(int index, BaseRow current) throws Exception;
 
 Review comment:
   what does `current` row used for? and what is `curren` row?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272003227
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
 
 Review comment:
   what does boundary means?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272003199
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
+
+   /**
+* reset the comparator.
+*/
+   void reset();
 
 Review comment:
   why a comparator need to be reset?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272006270
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
 
 Review comment:
   I don't quite get the point what exactly a window frame is, can you add more 
explanations?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272003306
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
+
+   /**
+* reset the comparator.
+*/
+   void reset();
+
+   /**
+* Compares its two row.  Returns a negative integer,
+* zero, or a positive integer as the first argument is less than, equal
+* to, or greater than the second.
+*
+* @param inputRow the first row to be compared.
+* @param inputIndex   the index for the first row.
+* @param currentRow   the second row to be compared.
+* @param currentIndex the index for the second row.
+* @return a negative integer, zero, or a positive integer as the
+* first argument is less than, equal to, or greater than the
+* second.
+*/
+   long compare(BaseRow inputRow, int inputIndex, BaseRow currentRow, int 
currentIndex);
 
 Review comment:
   what does index mean? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272005225
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
 
 Review comment:
   why OverWindowFrame should responsible for resetting the row buffer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272003166
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
 
 Review comment:
   I think more comments are needed for this interface, I really cannot get the 
point what's this interface is used for, and what the interface means...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272001917
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ##
 @@ -85,4 +85,9 @@ protected MemorySegment nextSegment(MemorySegment current) 
throws EOFException {
protected int getLimitForSegment(MemorySegment segment) {
return this.currentSegmentIndex == this.segments.size() - 1 ? 
this.limitInLastSegment : this.segmentSize;
}
+
+   public void updateLimitInLastSegment(int value) {
+   this.limitInLastSegment = value;
+   updateCurrentSegmentLimit();
 
 Review comment:
   this behavior looks confused, and looks no one is calling this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272005759
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   Because the `@Deprecated` `TaskManagerOptions.NETWORK_NUM_BUFFERS` is used 
in this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] 
Convert TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#discussion_r271807543
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+/**
+ * The base class for batch and stream TableEnvironments.
+ *
+ * The TableEnvironment is a central concept of the Table API and SQL 
integration. It is
+ * responsible for:
+ *
+ * 
+ * Registering a Table in the internal catalog
+ * Registering an external catalog
+ * Executing SQL queries
+ * Registering a user-defined (scalar, table, or aggregation) 
function
+ * Converting a DataStream or DataSet into a Table
+ * Holding a reference to an ExecutionEnvironment or 
StreamExecutionEnvironment
+ * 
+ */
+@PublicEvolving
+public interface TableEnvironment {
+
+   /**
+* Creates a table from a table source.
+*
+* @param source table source used as table
+*/
+   Table fromTableSource(TableSource source);
+
+   /**
+* Registers an {@link ExternalCatalog} under a unique name in the 
TableEnvironment's schema.
+* All tables registered in the {@link ExternalCatalog} can be accessed.
+*
+* @param nameThe name under which the externalCatalog will 
be registered
+* @param externalCatalog The externalCatalog to register
+*/
+   void registerExternalCatalog(String name, ExternalCatalog 
externalCatalog);
+
+   /**
+* Gets a registered {@link ExternalCatalog} by name.
+*
+* @param name The name to look up the {@link ExternalCatalog}
+* @return The {@link ExternalCatalog}
+*/
+   ExternalCatalog getRegisteredExternalCatalog(String name);
+
+   /**
+* Registers a {@link ScalarFunction} under a unique name. Replaces 
already existing
+* user-defined functions under this name.
+*/
+   void registerFunction(String name, ScalarFunction function);
+
+   /**
+* Registers a {@link Table} under a unique name in the 
TableEnvironment's catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param name The name under which the table will be registered.
+* @param table The table to register.
+*/
+   void registerTable(String name, Table table);
+
+   /**
+* Registers an external {@link TableSource} in this {@link 
TableEnvironment}'s catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param nameThe name under which the {@link TableSource} is 
registered.
+* @param tableSource The {@link TableSource} to register.
+*/
+   void registerTableSource(String name, TableSource tableSource);
+
+   /**
+* Registers an external {@link TableSink} with given field names and 
types in this
+* {@link TableEnvironment}'s catalog.
+* Registered sink tables can be referenced in SQL DML statements.
+*
+* @param name The name under which the {@link TableSink} is registered.
+* @param fieldNames The field names to register with the {@link 
TableSink}.
+* @param fieldTypes The field types to register with the {@link 
TableSink}.
+* @param tableSink The {@link TableSink} to register.
+*/
+   void registerTableSink(String name, String[] fieldNames, 
TypeInformation[] fieldTypes, TableSink tableSink);
+
+   /**
+* Registers an external {@link TableSink} with already 

[jira] [Assigned] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2019-04-03 Thread Luka Jurukovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luka Jurukovski reassigned FLINK-10195:
---

Assignee: Luka Jurukovski

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
JingsongLi commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272003098
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -112,7 +113,7 @@ private RexNode visitScalarFunc(FunctionDefinition def, 
List child) {
} else if (BuiltInFunctionDefinitions.EQUALS.equals(def)) {
return relBuilder.call(SqlStdOperatorTable.EQUALS, 
child);
 
 Review comment:
   Yes, it is better to all use `FlinkSqlOperatorTable`.  Avoid next bug...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7835: [FLINK-11750][network] Replace IntermediateResultPartitionID with ResultPartitionID in ResultPartitionDeploymentDescriptor

2019-04-03 Thread GitBox
flinkbot edited a comment on issue #7835: [FLINK-11750][network] Replace 
IntermediateResultPartitionID with ResultPartitionID in 
ResultPartitionDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7835#issuecomment-467344372
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @dawidwys [committer]
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #7835: [FLINK-11750][network] Replace IntermediateResultPartitionID with ResultPartitionID in ResultPartitionDeploymentDescriptor

2019-04-03 Thread GitBox
zhijiangW commented on issue #7835: [FLINK-11750][network] Replace 
IntermediateResultPartitionID with ResultPartitionID in 
ResultPartitionDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7835#issuecomment-479730951
 
 
   cc @azagrebin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 edited a comment on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 edited a comment on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-479572670
 
 
   @dawidwys  Thanks a lot for your review. I think you have given a lot of 
good suggestions!
   
   1. As for the useless APIs in `TableEnvironment` interface, maybe we can 
deprecate first and remove them later. This can make our interface compatible. 
   
   2. To uncouple the `Descriptor` with `TableEnvironment`,  we can change 
registerTableSource to getTableSource in `RegistrableDescriptor`(also should 
rename `RegistrableDescriptor`). Once we get the table source or sink, then we 
can register the table source or sink with TableEnvironment. Although this 
makes users impossible to register table source(or sink) directly, I am ok with 
it.
   
   3. For the `TableFactoryUtil`, I am not sure if we can port it into api or 
common module(I will make sure later). However, I do find we can also uncouple 
TableEnvironment here. In the current implementation, it seems table env is 
only used to decide whether it is a batch or stream source(or sink). We can 
also use descriptor to decide, i.e., a StreamTableDescriptor means a stream 
source(or sink). 
   
   For 2 and 3, I think the biggest obstacle is we may need to keep backward 
compatible while converting TableEnvironment into an interface. @twalthr Would 
be great to also have your opinions here.
   
   Looking forward to having your suggestions. Thank you.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272000909
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
 ##
 @@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.agg.batch
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.runtime.util.SingleElementIterator
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import 
org.apache.flink.table.`type`.TypeConverters.{createInternalTypeFromTypeInfo, 
createInternalTypeInfoFromInternalType}
+import org.apache.flink.table.`type`.{ArrayType, InternalType, MapType, 
RowType, StringType}
+import org.apache.flink.table.api.TableConfig
+import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForExternalType, 
genToExternal, genToInternal, newName, primitiveTypeTermForType}
+import org.apache.flink.table.codegen.OperatorCodeGenerator.STREAM_RECORD
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator}
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, 
ResolvedAggLocalReference, RexNodeConverter, SymbolExpression, 
TypeLiteralExpression, UnresolvedFieldReferenceExpression, 
ValueLiteralExpression}
+import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction,
 getAggUserDefinedInputTypes, getResultTypeOfAggregateFunction}
+import org.apache.flink.table.functions.{AggregateFunction, 
DeclarativeAggregateFunction, UserDefinedFunction}
+import org.apache.flink.table.generated.{GeneratedAggsHandleFunction, 
GeneratedOperator}
+import org.apache.flink.table.runtime.context.ExecutionContextImpl
+
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConverters._
+
+/**
+  * Batch aggregate code generate helper.
+  */
+object AggCodeGenHelper {
+
+  def getAggBufferNames(
+  auxGrouping: Array[Int], aggregates: Seq[UserDefinedFunction]): 
Array[Array[String]] = {
+auxGrouping.zipWithIndex.map {
+  case (_, index) => Array(s"aux_group$index")
+} ++ aggregates.zipWithIndex.toArray.map {
+  case (a: DeclarativeAggregateFunction, index) =>
+val idx = auxGrouping.length + index
+a.aggBufferAttributes.map(attr => s"agg${idx}_${attr.getName}")
+  case (_: AggregateFunction[_, _], index) =>
+val idx = auxGrouping.length + index
+Array(s"agg$idx")
+}
+  }
+
+  def getAggBufferTypes(
+  inputType: RowType, auxGrouping: Array[Int], aggregates: 
Seq[UserDefinedFunction])
+: Array[Array[InternalType]] = {
+auxGrouping.map { index =>
+  Array(inputType.getFieldTypes()(index))
+} ++ aggregates.map {
+  case a: DeclarativeAggregateFunction => a.getAggBufferTypes
+  case a: AggregateFunction[_, _] =>
+
Array(createInternalTypeFromTypeInfo(getAccumulatorTypeOfAggregateFunction(a)))
+}.toArray[Array[InternalType]]
+  }
+
+  def getUdaggs(
+  aggregates: Seq[UserDefinedFunction]): Map[AggregateFunction[_, _], 
String] = {
+aggregates
+.filter(a => a.isInstanceOf[AggregateFunction[_, _]])
+.map(a => a -> CodeGenUtils.udfFieldName(a)).toMap
+.asInstanceOf[Map[AggregateFunction[_, _], String]]
+  }
+
+  def projectRowType(
+  rowType: RowType, mapping: Array[Int]): RowType = {
+new RowType(mapping.map(rowType.getTypeAt), 
mapping.map(rowType.getFieldNames()(_)))
+  }
+
+  /**
+* Add agg handler to class member and open it.
+*/
+  private[flink] def addAggsHandler(
+  aggsHandler: GeneratedAggsHandleFunction,
+  ctx: CodeGeneratorContext,
+  aggsHandlerCtx: CodeGeneratorContext): String = {
+   

[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272000967
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
 ##
 @@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.agg.batch
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.runtime.util.SingleElementIterator
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import 
org.apache.flink.table.`type`.TypeConverters.{createInternalTypeFromTypeInfo, 
createInternalTypeInfoFromInternalType}
+import org.apache.flink.table.`type`.{ArrayType, InternalType, MapType, 
RowType, StringType}
+import org.apache.flink.table.api.TableConfig
+import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForExternalType, 
genToExternal, genToInternal, newName, primitiveTypeTermForType}
+import org.apache.flink.table.codegen.OperatorCodeGenerator.STREAM_RECORD
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator}
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, 
ResolvedAggLocalReference, RexNodeConverter, SymbolExpression, 
TypeLiteralExpression, UnresolvedFieldReferenceExpression, 
ValueLiteralExpression}
+import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction,
 getAggUserDefinedInputTypes, getResultTypeOfAggregateFunction}
+import org.apache.flink.table.functions.{AggregateFunction, 
DeclarativeAggregateFunction, UserDefinedFunction}
+import org.apache.flink.table.generated.{GeneratedAggsHandleFunction, 
GeneratedOperator}
+import org.apache.flink.table.runtime.context.ExecutionContextImpl
+
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConverters._
+
+/**
+  * Batch aggregate code generate helper.
+  */
+object AggCodeGenHelper {
+
+  def getAggBufferNames(
+  auxGrouping: Array[Int], aggregates: Seq[UserDefinedFunction]): 
Array[Array[String]] = {
+auxGrouping.zipWithIndex.map {
+  case (_, index) => Array(s"aux_group$index")
+} ++ aggregates.zipWithIndex.toArray.map {
+  case (a: DeclarativeAggregateFunction, index) =>
+val idx = auxGrouping.length + index
+a.aggBufferAttributes.map(attr => s"agg${idx}_${attr.getName}")
+  case (_: AggregateFunction[_, _], index) =>
+val idx = auxGrouping.length + index
+Array(s"agg$idx")
+}
+  }
+
+  def getAggBufferTypes(
+  inputType: RowType, auxGrouping: Array[Int], aggregates: 
Seq[UserDefinedFunction])
+: Array[Array[InternalType]] = {
+auxGrouping.map { index =>
+  Array(inputType.getFieldTypes()(index))
+} ++ aggregates.map {
+  case a: DeclarativeAggregateFunction => a.getAggBufferTypes
+  case a: AggregateFunction[_, _] =>
+
Array(createInternalTypeFromTypeInfo(getAccumulatorTypeOfAggregateFunction(a)))
+}.toArray[Array[InternalType]]
+  }
+
+  def getUdaggs(
+  aggregates: Seq[UserDefinedFunction]): Map[AggregateFunction[_, _], 
String] = {
+aggregates
+.filter(a => a.isInstanceOf[AggregateFunction[_, _]])
+.map(a => a -> CodeGenUtils.udfFieldName(a)).toMap
+.asInstanceOf[Map[AggregateFunction[_, _], String]]
+  }
+
+  def projectRowType(
+  rowType: RowType, mapping: Array[Int]): RowType = {
+new RowType(mapping.map(rowType.getTypeAt), 
mapping.map(rowType.getFieldNames()(_)))
+  }
+
+  /**
+* Add agg handler to class member and open it.
+*/
+  private[flink] def addAggsHandler(
+  aggsHandler: GeneratedAggsHandleFunction,
+  ctx: CodeGeneratorContext,
+  aggsHandlerCtx: CodeGeneratorContext): String = {
+   

[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272001068
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
 ##
 @@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.agg.batch
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.metrics.Gauge
+import org.apache.flink.table.`type`.{InternalType, RowType}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkPlannerImpl
+import org.apache.flink.table.codegen.CodeGenUtils.{binaryRowFieldSetAccess, 
binaryRowSetNull}
+import 
org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator, 
SortCodeGenerator}
+import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, 
JoinedRow}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, 
RexNodeConverter, SymbolExpression, TypeLiteralExpression, 
UnresolvedFieldReferenceExpression, ValueLiteralExpression}
+import org.apache.flink.table.functions.{AggregateFunction, 
DeclarativeAggregateFunction, UserDefinedFunction}
+import org.apache.flink.table.generated.{NormalizedKeyComputer, 
RecordComparator}
+import org.apache.flink.table.runtime.aggregate.{BytesHashMap, 
BytesHashMapSpillMemorySegmentPool}
+import org.apache.flink.table.runtime.sort.BufferedKVExternalSorter
+import org.apache.flink.table.typeutils.BinaryRowSerializer
+
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
+
+object HashAggCodeGenHelper {
+
+  def prepareHashAggKVTypes(
+  ctx: CodeGeneratorContext,
+  aggMapKeyTypesTerm: String,
+  aggBufferTypesTerm: String,
+  aggMapKeyType: RowType,
+  aggBufferType: RowType): Unit = {
+ctx.addReusableObjectWithName(aggMapKeyType.getFieldTypes, 
aggMapKeyTypesTerm)
+ctx.addReusableObjectWithName(aggBufferType.getFieldTypes, 
aggBufferTypesTerm)
+  }
+
+  private[flink] def prepareHashAggMap(
+  ctx: CodeGeneratorContext,
+  config: TableConfig,
+  reservedManagedMemory: Long,
+  maxManagedMemory: Long,
+  groupKeyTypesTerm: String,
+  aggBufferTypesTerm: String,
+  aggregateMapTerm: String): Unit = {
+// allocate memory segments for aggregate map
+
+// create aggregate map
+val mapTypeTerm = classOf[BytesHashMap].getName
+ctx.addReusableMember(s"private transient $mapTypeTerm $aggregateMapTerm;")
+ctx.addReusableOpenStatement(s"$aggregateMapTerm " +
+s"= new $mapTypeTerm(" +
+s"this.getContainingTask()," +
+s"this.getContainingTask().getEnvironment().getMemoryManager()," +
+s"${reservedManagedMemory}L," +
+s" $groupKeyTypesTerm," +
+s" $aggBufferTypesTerm);")
+// close aggregate map and release memory segments
+ctx.addReusableCloseStatement(s"$aggregateMapTerm.free();")
+ctx.addReusableCloseStatement(s"")
+  }
+
+  private[flink] def prepareTermForAggMapIteration(
+  ctx: CodeGeneratorContext,
+  outputTerm: String,
+  outputType: RowType,
+  aggMapKeyType: RowType,
+  aggBufferType: RowType,
+  outputClass: Class[_ <: BaseRow]): (String, String, String) = {
+// prepare iteration var terms
+val reuseAggMapKeyTerm = CodeGenUtils.newName("reuseAggMapKey")
+val reuseAggBufferTerm = CodeGenUtils.newName("reuseAggBuffer")
+val reuseAggMapEntryTerm = CodeGenUtils.newName("reuseAggMapEntry")
+// gen code to prepare agg output using agg buffer and key from the 
aggregate map
+val binaryRow = classOf[BinaryRow].getName
+val mapEntryTypeTerm = classOf[BytesHashMap.Entry].getCanonicalName
+
+

[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272000977
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
 ##
 @@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.agg.batch
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.runtime.util.SingleElementIterator
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import 
org.apache.flink.table.`type`.TypeConverters.{createInternalTypeFromTypeInfo, 
createInternalTypeInfoFromInternalType}
+import org.apache.flink.table.`type`.{ArrayType, InternalType, MapType, 
RowType, StringType}
+import org.apache.flink.table.api.TableConfig
+import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForExternalType, 
genToExternal, genToInternal, newName, primitiveTypeTermForType}
+import org.apache.flink.table.codegen.OperatorCodeGenerator.STREAM_RECORD
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator}
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, 
ResolvedAggLocalReference, RexNodeConverter, SymbolExpression, 
TypeLiteralExpression, UnresolvedFieldReferenceExpression, 
ValueLiteralExpression}
+import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction,
 getAggUserDefinedInputTypes, getResultTypeOfAggregateFunction}
+import org.apache.flink.table.functions.{AggregateFunction, 
DeclarativeAggregateFunction, UserDefinedFunction}
+import org.apache.flink.table.generated.{GeneratedAggsHandleFunction, 
GeneratedOperator}
+import org.apache.flink.table.runtime.context.ExecutionContextImpl
+
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConverters._
+
+/**
+  * Batch aggregate code generate helper.
+  */
+object AggCodeGenHelper {
+
+  def getAggBufferNames(
+  auxGrouping: Array[Int], aggregates: Seq[UserDefinedFunction]): 
Array[Array[String]] = {
+auxGrouping.zipWithIndex.map {
+  case (_, index) => Array(s"aux_group$index")
+} ++ aggregates.zipWithIndex.toArray.map {
+  case (a: DeclarativeAggregateFunction, index) =>
+val idx = auxGrouping.length + index
+a.aggBufferAttributes.map(attr => s"agg${idx}_${attr.getName}")
+  case (_: AggregateFunction[_, _], index) =>
+val idx = auxGrouping.length + index
+Array(s"agg$idx")
+}
+  }
+
+  def getAggBufferTypes(
+  inputType: RowType, auxGrouping: Array[Int], aggregates: 
Seq[UserDefinedFunction])
+: Array[Array[InternalType]] = {
+auxGrouping.map { index =>
+  Array(inputType.getFieldTypes()(index))
+} ++ aggregates.map {
+  case a: DeclarativeAggregateFunction => a.getAggBufferTypes
+  case a: AggregateFunction[_, _] =>
+
Array(createInternalTypeFromTypeInfo(getAccumulatorTypeOfAggregateFunction(a)))
+}.toArray[Array[InternalType]]
+  }
+
+  def getUdaggs(
+  aggregates: Seq[UserDefinedFunction]): Map[AggregateFunction[_, _], 
String] = {
+aggregates
+.filter(a => a.isInstanceOf[AggregateFunction[_, _]])
+.map(a => a -> CodeGenUtils.udfFieldName(a)).toMap
+.asInstanceOf[Map[AggregateFunction[_, _], String]]
+  }
+
+  def projectRowType(
+  rowType: RowType, mapping: Array[Int]): RowType = {
+new RowType(mapping.map(rowType.getTypeAt), 
mapping.map(rowType.getFieldNames()(_)))
+  }
+
+  /**
+* Add agg handler to class member and open it.
+*/
+  private[flink] def addAggsHandler(
+  aggsHandler: GeneratedAggsHandleFunction,
+  ctx: CodeGeneratorContext,
+  aggsHandlerCtx: CodeGeneratorContext): String = {
+   

[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272001002
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggWithoutKeysCodeGenerator.scala
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.agg.batch
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.`type`.RowType
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext}
+import org.apache.flink.table.codegen.OperatorCodeGenerator.generateCollect
+import 
org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.genSortAggCodes
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.generated.GeneratedOperator
+import org.apache.flink.table.plan.util.AggregateInfoList
+import org.apache.flink.table.runtime.TableStreamOperator
+
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Generate a agg operator without keys, auxGrouping must be empty too.
+  */
+object AggWithoutKeysCodeGenerator {
+
+  def genWithoutKeys(
+  ctx: CodeGeneratorContext,
+  builder: RelBuilder,
+  aggInfoList: AggregateInfoList,
+  inputType: RowType,
+  outputType: RowType,
+  isMerge: Boolean,
+  isFinal: Boolean,
+  prefix: String): GeneratedOperator[OneInputStreamOperator[BaseRow, 
BaseRow]] = {
+
 
 Review comment:
   delete blank line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272000515
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##
 @@ -112,7 +113,7 @@ private RexNode visitScalarFunc(FunctionDefinition def, 
List child) {
} else if (BuiltInFunctionDefinitions.EQUALS.equals(def)) {
return relBuilder.call(SqlStdOperatorTable.EQUALS, 
child);
 
 Review comment:
   maybe we should change all SqlStdOperatorTable to FlinkSqlOperatorTable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8099: [FLINK-12081][table-planner-blink] Introduce aggregation operator code generator to blink batch

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8099: 
[FLINK-12081][table-planner-blink] Introduce aggregation operator code 
generator to blink batch
URL: https://github.com/apache/flink/pull/8099#discussion_r272000862
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
 ##
 @@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.agg.batch
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.runtime.util.SingleElementIterator
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import 
org.apache.flink.table.`type`.TypeConverters.{createInternalTypeFromTypeInfo, 
createInternalTypeInfoFromInternalType}
+import org.apache.flink.table.`type`.{ArrayType, InternalType, MapType, 
RowType, StringType}
+import org.apache.flink.table.api.TableConfig
+import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForExternalType, 
genToExternal, genToInternal, newName, primitiveTypeTermForType}
+import org.apache.flink.table.codegen.OperatorCodeGenerator.STREAM_RECORD
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator}
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, 
ResolvedAggLocalReference, RexNodeConverter, SymbolExpression, 
TypeLiteralExpression, UnresolvedFieldReferenceExpression, 
ValueLiteralExpression}
+import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction,
 getAggUserDefinedInputTypes, getResultTypeOfAggregateFunction}
+import org.apache.flink.table.functions.{AggregateFunction, 
DeclarativeAggregateFunction, UserDefinedFunction}
+import org.apache.flink.table.generated.{GeneratedAggsHandleFunction, 
GeneratedOperator}
+import org.apache.flink.table.runtime.context.ExecutionContextImpl
+
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConverters._
+
+/**
+  * Batch aggregate code generate helper.
+  */
+object AggCodeGenHelper {
+
+  def getAggBufferNames(
+  auxGrouping: Array[Int], aggregates: Seq[UserDefinedFunction]): 
Array[Array[String]] = {
+auxGrouping.zipWithIndex.map {
+  case (_, index) => Array(s"aux_group$index")
+} ++ aggregates.zipWithIndex.toArray.map {
+  case (a: DeclarativeAggregateFunction, index) =>
+val idx = auxGrouping.length + index
+a.aggBufferAttributes.map(attr => s"agg${idx}_${attr.getName}")
+  case (_: AggregateFunction[_, _], index) =>
+val idx = auxGrouping.length + index
+Array(s"agg$idx")
+}
+  }
+
+  def getAggBufferTypes(
+  inputType: RowType, auxGrouping: Array[Int], aggregates: 
Seq[UserDefinedFunction])
+: Array[Array[InternalType]] = {
+auxGrouping.map { index =>
+  Array(inputType.getFieldTypes()(index))
+} ++ aggregates.map {
+  case a: DeclarativeAggregateFunction => a.getAggBufferTypes
+  case a: AggregateFunction[_, _] =>
+
Array(createInternalTypeFromTypeInfo(getAccumulatorTypeOfAggregateFunction(a)))
+}.toArray[Array[InternalType]]
+  }
+
+  def getUdaggs(
+  aggregates: Seq[UserDefinedFunction]): Map[AggregateFunction[_, _], 
String] = {
+aggregates
+.filter(a => a.isInstanceOf[AggregateFunction[_, _]])
+.map(a => a -> CodeGenUtils.udfFieldName(a)).toMap
+.asInstanceOf[Map[AggregateFunction[_, _], String]]
+  }
+
+  def projectRowType(
+  rowType: RowType, mapping: Array[Int]): RowType = {
+new RowType(mapping.map(rowType.getTypeAt), 
mapping.map(rowType.getFieldNames()(_)))
+  }
+
+  /**
+* Add agg handler to class member and open it.
+*/
+  private[flink] def addAggsHandler(
+  aggsHandler: GeneratedAggsHandleFunction,
+  ctx: CodeGeneratorContext,
+  aggsHandlerCtx: CodeGeneratorContext): String = {
+   

[jira] [Assigned] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-03 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-12070:
--

Assignee: BoWang

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271996542
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
 
 Review comment:
   You want to set `catalogName` right? So, we should change `db1` to 
`testCatalogName`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271991771
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
 
 Review comment:
   We should also change the name of the default database in this.databases. 
otherwise, will get empty when call database.get(defaultDatabaseName), right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271996701
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
+   }
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
+
+   @After
+   public void close() {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+   @AfterClass
+   public static void clean() throws IOException {
+   catalog.close();
+   }
+
+   // -- tables --
+
+   @Test
+   public void testCreateTable_Streaming() {
+   catalog.createDatabase(db1, createDb(), false);
+   GenericCatalogTable table = createStreamingTable();
+   catalog.createTable(path1, table, false);
+
+   CatalogTestUtil.compare(table, (GenericCatalogTable) 
catalog.getTable(path1));
+   }
+
+   @Test
+   public void testCreateTable_Batch() {
+   catalog.createDatabase(db1, createDb(), false);
+
+   GenericCatalogTable table = createTable();
+   catalog.createTable(path1, table, false);
+
+   CatalogTestUtil.compare(table, (GenericCatalogTable) 
catalog.getTable(path1));
+
+   List tables = 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990155
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
 
 Review comment:
   We also need to override equals, hashCode and toString(Also for other 
generic classes).
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271993087
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String dbName, boolean ignoreIfNotExists) 
throws DatabaseNotExistException {
+   if (databases.containsKey(dbName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(dbName)) {
+   databases.remove(dbName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, dbName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
dbName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
+   // TODO: also check function when function is added.
+   }
+
+   @Override
+   public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
+   throws DatabaseNotExistException {
+   if (databaseExists(databaseName)) {
+   databases.put(databaseName, 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990956
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
+   // Comment of the table
+   private String comment = "This is a generic catalog table.";
+
+   public GenericCatalogTable(TableSchema tableSchema, TableStats 
tableStats, Map properties) {
+   this.tableSchema = tableSchema;
+   this.tableStats = tableStats;
+   this.properties = properties;
+   }
+
+   @Override
+   public TableStats getStatistics() {
+   return this.tableStats;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return this.tableSchema;
+   }
+
+   @Override
+   public GenericCatalogTable copy() {
+   return new GenericCatalogTable(this.tableSchema, 
this.tableStats, this.properties);
 
 Review comment:
   I think we should perform a deep copy as it is described in the interface?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271999138
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 Review comment:
   Do we need `Generic` prefix? How about using `InMemoryCatalog` directly?  Or 
you already see the requirement for many kinds of `InMemoryCatalog`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271991218
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * A generic catalog view implementation.
+ */
+public class GenericCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   private TableSchema schema;
 
 Review comment:
   private final ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271995584
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A database name and object (table/view/function) name combo in a catalog.
+ */
+public class ObjectPath {
+   private final String databaseName;
+   private final String objectName;
+
+   public ObjectPath(String databaseName, String objectName) {
+   checkNotNull(databaseName, "databaseName cannot be null");
+   checkNotNull(objectName, "objectName cannot be null");
 
 Review comment:
   What are the format restrictions for dataBaseName and objectName, such as 
special characters? e.g.: '.', duo to we using fullName.split("\\."). right?  
So, it is better to add some check logic.
   I had left the comment in pre-review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271994232
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utility class for catalog testing.
+ */
+public class CatalogTestUtil {
+
+   public static GenericCatalogTable createTable() {
+   TableSchema tableSchema = 
TableSchema.fromTypeInfo(getRowTypeInfo());
+   return createTable(tableSchema, createTableStats(), new 
HashMap());
+   }
+
+   public static RowTypeInfo getRowTypeInfo() {
+   return new RowTypeInfo(
+   new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO},
+   new String[] {"a", "b"});
+   }
+
+   public static TableStats createTableStats() {
+   return new TableStats(2);
+   }
+
+   public static GenericCatalogTable createTable(TableSchema schema, 
Map tableProperties) {
+   return createTable(schema, new TableStats(0), tableProperties);
+   }
+
+   public static GenericCatalogTable createTable(TableSchema schema, 
TableStats stats,
+   Map tableProperties) {
+   return new GenericCatalogTable(schema, stats, tableProperties);
+   }
+
+   public static void compare(GenericCatalogTable t1, GenericCatalogTable 
t2) {
 
 Review comment:
   Maybe change `compare` to `assertEquals` is more accurate? due to `compare` 
in general will return an INT.  What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271991296
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * A generic catalog view implementation.
+ */
+public class GenericCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   private TableSchema schema;
+   private Map properties;
+   private String comment;
+
+   public GenericCatalogView(String originalQuery, String expandedQuery, 
TableSchema schema,
+   Map properties, String comment) {
+   this.originalQuery = originalQuery;
+   this.expandedQuery = expandedQuery;
+   this.schema = schema;
+   this.properties = properties;
+   this.comment = comment;
+   }
+
+   @Override
+   public String getOriginalQuery() {
+   return this.originalQuery;
+   }
+
+   @Override
+   public String getExpandedQuery() {
+   return this.expandedQuery;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return schema;
+   }
+
+   @Override
+   public GenericCatalogView copy() {
+   return new GenericCatalogView(this.originalQuery, 
this.expandedQuery, schema,
 
 Review comment:
   I think we should perform a deep copy. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990551
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
+   private final Map properties;
+   private String comment;
+
+   public GenericCatalogDatabase() {
+   this(new HashMap<>(), null);
 
 Review comment:
   Do we need to add the default comment, something like:  `This is a generic 
catalog database.`? then make it consistent with the comment in 
GenericCatalogTable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271997186
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String dbName, boolean ignoreIfNotExists) 
throws DatabaseNotExistException {
+   if (databases.containsKey(dbName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(dbName)) {
+   databases.remove(dbName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, dbName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
dbName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
+   // TODO: also check function when function is added.
+   }
+
+   @Override
+   public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
+   throws DatabaseNotExistException {
+   if (databaseExists(databaseName)) {
+   databases.put(databaseName, 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271996645
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
+   }
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
+
+   @After
+   public void close() {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+   @AfterClass
 
 Review comment:
   Add @BeforeClass for  `open` logic?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990685
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
 
 Review comment:
   private final ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271988554
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonTable.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * CommonTable is the common parent of table and view. It has a map of
+ * key-value pairs defining the properties of the table.
+ */
+public interface CommonTable {
+   /**
+* Get the properties of the table.
+* @return table property map
+*/
+   Map getProperties();
+
+   /**
+* Get the schema of the table.
+* @return schema of the table
+*/
+   TableSchema getSchema();
+
+
 
 Review comment:
   Remove empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271995866
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A database name and object (table/view/function) name combo in a catalog.
+ */
+public class ObjectPath {
+   private final String databaseName;
+   private final String objectName;
+
+   public ObjectPath(String databaseName, String objectName) {
+   checkNotNull(databaseName, "databaseName cannot be null");
+   checkNotNull(objectName, "objectName cannot be null");
+
+   this.databaseName = databaseName;
+   this.objectName = objectName;
+   }
+
+   public String getDatabaseName() {
+   return databaseName;
+   }
+
+   public String getObjectName() {
+   return objectName;
+   }
+
+   public String getFullName() {
+   return String.format("%s.%s", databaseName, objectName);
+   }
+
+   public static ObjectPath fromString(String fullName) {
+   String[] paths = fullName.split("\\.");
 
 Review comment:
   Please check the null case for `fullname`? 
   Which I had left the comment in pre-review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-03 Thread Luka Jurukovski (JIRA)
Luka Jurukovski created FLINK-12107:
---

 Summary: Use Proxy For DataDog Validation
 Key: FLINK-12107
 URL: https://issues.apache.org/jira/browse/FLINK-12107
 Project: Flink
  Issue Type: Improvement
Reporter: Luka Jurukovski


Recently support for DataDog Metric Proxy was added, however validation for the 
api keys ignores the use of the proxy. There are circumstances in which 
proxying is used due to the fact that the service itself is not reachable 
directly.

Currently the validation pings datadog using the api keys provided.

https://app.datadoghq.com/api/v1/validate?api_key=





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8110: [FLINK-12098] 
[table-planner-blink] Add support for generating optimized logical plan for 
simple group aggregate on stream
URL: https://github.com/apache/flink/pull/8110#discussion_r271998434
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
 ##
 @@ -495,4 +497,65 @@ object AggregateUtil extends Enumeration {
 }
 (aggBufferNames ++ distinctBufferNames).toArray
   }
+
+  /**
+* Optimize max or min with retraction agg. MaxWithRetract can be optimized 
to Max if input is
+* update increasing.
+*/
+  def getNeedRetractions(
+  groupSize: Int,
+  needRetraction: Boolean,
+  aggs: Seq[AggregateCall]): Array[Boolean] = {
+
+val needRetractionArray = Array.fill(aggs.size)(needRetraction)
+// TODO supports RelModifiedMonotonicity
+
+needRetractionArray
+  }
+
+  /**
+* Derives output row type from local aggregate
+*/
+  def inferLocalAggRowType(
+  aggInfoList: AggregateInfoList,
+  inputRowType: RelDataType,
+  groupSet: Array[Int],
+  typeFactory: FlinkTypeFactory): RelDataType = {
+
+val accTypes = aggInfoList.getAccTypes
+val groupingTypes = groupSet
+  .map(inputRowType.getFieldList.get(_).getType)
+  .map(FlinkTypeFactory.toInternalType)
+val groupingNames = groupSet.map(inputRowType.getFieldNames.get(_))
+val accFieldNames = inferAggAccumulatorNames(aggInfoList)
+
+typeFactory.buildRelDataType(
+  groupingNames ++ accFieldNames,
+  groupingTypes ++ 
accTypes.map(TypeConverters.createInternalTypeFromTypeInfo))
+  }
+
+  /**
+* Derives accumulators names from aggregate
+*/
+  def inferAggAccumulatorNames(aggInfoList: AggregateInfoList): Array[String] 
= {
+
 
 Review comment:
   delete blank line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8110: [FLINK-12098] 
[table-planner-blink] Add support for generating optimized logical plan for 
simple group aggregate on stream
URL: https://github.com/apache/flink/pull/8110#discussion_r271998411
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
 ##
 @@ -495,4 +497,65 @@ object AggregateUtil extends Enumeration {
 }
 (aggBufferNames ++ distinctBufferNames).toArray
   }
+
+  /**
+* Optimize max or min with retraction agg. MaxWithRetract can be optimized 
to Max if input is
+* update increasing.
+*/
+  def getNeedRetractions(
+  groupSize: Int,
+  needRetraction: Boolean,
+  aggs: Seq[AggregateCall]): Array[Boolean] = {
+
 
 Review comment:
   delete blank line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8110: [FLINK-12098] 
[table-planner-blink] Add support for generating optimized logical plan for 
simple group aggregate on stream
URL: https://github.com/apache/flink/pull/8110#discussion_r271997758
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/runtime/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ##
 @@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions.aggfunctions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, 
Types}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataformat.Decimal
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.runtime.functions.aggfunctions.Ordering._
+import org.apache.flink.table.typeutils.DecimalTypeInfo
+
+import java.lang
+import java.lang.{Iterable => JIterable, Long => JLong}
+import java.sql.{Date, Time, Timestamp}
+
+/** The initial accumulator for Max with retraction aggregate function */
+class MaxWithRetractAccumulator[T] {
 
 Review comment:
   Can we make this Java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8110: [FLINK-12098] 
[table-planner-blink] Add support for generating optimized logical plan for 
simple group aggregate on stream
URL: https://github.com/apache/flink/pull/8110#discussion_r271998426
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
 ##
 @@ -495,4 +497,65 @@ object AggregateUtil extends Enumeration {
 }
 (aggBufferNames ++ distinctBufferNames).toArray
   }
+
+  /**
+* Optimize max or min with retraction agg. MaxWithRetract can be optimized 
to Max if input is
+* update increasing.
+*/
+  def getNeedRetractions(
+  groupSize: Int,
+  needRetraction: Boolean,
+  aggs: Seq[AggregateCall]): Array[Boolean] = {
+
+val needRetractionArray = Array.fill(aggs.size)(needRetraction)
+// TODO supports RelModifiedMonotonicity
+
+needRetractionArray
+  }
+
+  /**
+* Derives output row type from local aggregate
+*/
+  def inferLocalAggRowType(
+  aggInfoList: AggregateInfoList,
+  inputRowType: RelDataType,
+  groupSet: Array[Int],
+  typeFactory: FlinkTypeFactory): RelDataType = {
+
 
 Review comment:
   delete blank line
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8110: [FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream

2019-04-03 Thread GitBox
KurtYoung commented on a change in pull request #8110: [FLINK-12098] 
[table-planner-blink] Add support for generating optimized logical plan for 
simple group aggregate on stream
URL: https://github.com/apache/flink/pull/8110#discussion_r271997795
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/runtime/functions/aggfunctions/MinAggFunctionWithRetract.scala
 ##
 @@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions.aggfunctions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, 
Types}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataformat.Decimal
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.runtime.functions.aggfunctions.Ordering._
+import org.apache.flink.table.typeutils.DecimalTypeInfo
+
+import java.lang
+import java.lang.{Iterable => JIterable, Long => JLong}
+import java.sql.{Date, Time, Timestamp}
+
+/** The initial accumulator for Min with retraction aggregate function */
+class MinWithRetractAccumulator[T] {
 
 Review comment:
   Can we make this Java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value

2019-04-03 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809428#comment-16809428
 ] 

leesf edited comment on FLINK-12105 at 4/4/19 1:54 AM:
---

[~vmehra],hi. I think the issue could be more appropriate to be filed in the 
calcite community, since the exception is thrown when calcite validates query 
SQL.


was (Author: xleesf):
[~vmehra],hi. The issue could be more appropriate to be filed in the calcite 
community, since the exception is thrown when calcite validate query SQL than 
Flink.

> TUMBLE INTERVAL value errors out for 100 or more value
> --
>
> Key: FLINK-12105
> URL: https://issues.apache.org/jira/browse/FLINK-12105
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.7.2
> Environment: [https://github.com/ververica/sql-training]
>Reporter: Vinod Mehra
>Priority: Major
>
> I ran into this while experimenting with different values at Lyft eng. 
> However it is reproducible with [https://github.com/ververica/sql-training] 
> as well. I showed this issue to the training instructors during 
> flink-forward-19 and they asked me to file this bug.
> The INTERVAL values work fine until 99. Errors after that:
> *TUMBLE(rideTime, INTERVAL '100' SECOND)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of SECOND(2) field_
> *TUMBLE(rideTime, INTERVAL '100' MINUTE)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of MINUTE(2) field_
> *TUMBLE(rideTime, INTERVAL '100' HOUR)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of HOUR(2) field_
> *TUMBLE(rideTime, INTERVAL '100' DAY)*
> _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value 
> 100 exceeds precision of DAY(2) field
> (Note: MONTH AND YEAR also error out but for different reasons ("_Only 
> constant window intervals with millisecond resolution are supported_"). MONTH 
> and YEAR intervals are not supported at all currently. I was told that it is 
> hard to implement because of timezone differences. I will file that 
> separately.)_
>  _



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value

2019-04-03 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809428#comment-16809428
 ] 

leesf commented on FLINK-12105:
---

[~vmehra],hi. The issue could be more appropriate to be filed in the calcite 
community, since the exception is thrown when calcite validate query SQL than 
Flink.

> TUMBLE INTERVAL value errors out for 100 or more value
> --
>
> Key: FLINK-12105
> URL: https://issues.apache.org/jira/browse/FLINK-12105
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.7.2
> Environment: [https://github.com/ververica/sql-training]
>Reporter: Vinod Mehra
>Priority: Major
>
> I ran into this while experimenting with different values at Lyft eng. 
> However it is reproducible with [https://github.com/ververica/sql-training] 
> as well. I showed this issue to the training instructors during 
> flink-forward-19 and they asked me to file this bug.
> The INTERVAL values work fine until 99. Errors after that:
> *TUMBLE(rideTime, INTERVAL '100' SECOND)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of SECOND(2) field_
> *TUMBLE(rideTime, INTERVAL '100' MINUTE)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of MINUTE(2) field_
> *TUMBLE(rideTime, INTERVAL '100' HOUR)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of HOUR(2) field_
> *TUMBLE(rideTime, INTERVAL '100' DAY)*
> _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value 
> 100 exceeds precision of DAY(2) field
> (Note: MONTH AND YEAR also error out but for different reasons ("_Only 
> constant window intervals with millisecond resolution are supported_"). MONTH 
> and YEAR intervals are not supported at all currently. I was told that it is 
> hard to implement because of timezone differences. I will file that 
> separately.)_
>  _



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12106) Jobmanager is killing FINISHED taskmanger containers, causing exception in still running Taskmanagers an

2019-04-03 Thread Guowei Ma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809421#comment-16809421
 ] 

Guowei Ma commented on FLINK-12106:
---

AFAIK, the community is working on it.  
[FLINK-10941|https://issues.apache.org/jira/browse/FLINK-10941] has the same 
problem. 

This issue is related to the lifecycle control of Shuffle Resource. There have 
some related discussions and design[1][2].

[1] 
[https://docs.google.com/document/d/13vAJJxfRXAwI4MtO8dux8hHnNMw2Biu5XRrb_hvGehA/edit#heading=h.v7vhb7w01d61]

[2] 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager]

 

> Jobmanager is killing FINISHED taskmanger containers, causing exception in 
> still running Taskmanagers an
> 
>
> Key: FLINK-12106
> URL: https://issues.apache.org/jira/browse/FLINK-12106
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.7.2
> Environment: Hadoop:  hdp/2.5.6.0-40
> Flink: 2.7.2
>Reporter: John
>Priority: Major
>
> When running a single flink job on YARN, some of the taskmanger containers 
> reach the FINISHED state before others.  It appears that, after receiving 
> final execution state FINISHED from a taskmanager, jobmanager is waiting ~68 
> seconds and then freeing the associated slot in the taskmanager.  After and 
> additional 60 seconds, jobmanager is stopping the same taskmanger because 
> TaskExecutor exceeded the idle timeout.
> Meanwhile, other taskmangers are still working to complete the job.  Within 
> 10 seconds after the taskmanger container above is stopped, the remaining 
> task managers receive an exception due to loss of connection to the stopped 
> taskmanager.  These exceptions result job failure.
>  
> Relevant logs:
> 2019-04-03 13:49:00,013 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_38 
> (akka.tcp://flink@hadoop4:42745/user/taskmanager_0) at ResourceManager
> 2019-04-03 13:49:05,900 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_59 
> (akka.tcp://flink@hadoop9:55042/user/taskmanager_0) at ResourceManager
>  
>  
> 2019-04-03 13:48:51,132 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1553017480503_0158_01_77 - Remaining pending container 
> requests: 6
> 2019-04-03 13:48:52,862 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner    
>               -     
> -Dlog.file=/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_77/taskmanager.log
> 2019-04-03 13:48:57,490 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
> initialization (took 202 ms). Listening on SocketAddress 
> /192.168.230.69:40140.
> 2019-04-03 13:49:12,575 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_77 
> (akka.tcp://flink@hadoop9:51525/user/taskmanager_0) at ResourceManager
> 2019-04-03 13:49:12,631 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated 
> slot for AllocationID\{42fed3e5a136240c23cc7b394e3249e9}.
> 2019-04-03 14:58:15,188 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - 
> Un-registering task and sending final execution state FINISHED to JobManager 
> for task DataSink 
> (com.anovadata.alexflinklib.sinks.bucketing.BucketingOutputFormat@26874f2c) 
> a4b5fb32830d4561147b2714828109e2.
> 2019-04-03 14:59:23,049 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> idle slot [AllocationID\{42fed3e5a136240c23cc7b394e3249e9}].
> 2019-04-03 14:59:23,058 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot 
> TaskSlot(index:0, state:ACTIVE, resource profile: 
> ResourceProfile\{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
> networkMemoryInMB=2147483647}, allocationId: 
> AllocationID\{42fed3e5a136240c23cc7b394e3249e9}, jobId: 
> a6c4e367698c15cdf168d19a89faff1d).
> 2019-04-03 15:00:02,641 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Stopping container container_1553017480503_0158_01_77.
> 2019-04-03 15:00:02,646 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Closing TaskExecutor connection 
> container_1553017480503_0158_01_77 because: TaskExecutor exceeded the 
> idle timeout.
>  
>  
> 2019-04-03 13:48:48,902 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner    
>   

[GitHub] [flink] leesf commented on issue #8079: [FLINK-12054][Tests] HBaseConnectorITCase fails on Java 9

2019-04-03 Thread GitBox
leesf commented on issue #8079: [FLINK-12054][Tests] HBaseConnectorITCase fails 
on Java 9
URL: https://github.com/apache/flink/pull/8079#issuecomment-479713439
 
 
   @zentol hi, Chesnay. Sorry to bother you, could you please tell me the 
reason you closed this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271987200
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
+   /**
+* Get the names of all databases in this catalog.
+*
+* @return The list of the names of all databases
+*/
+   List listDatabases();
+
+   /**
+* Get a database from this catalog.
+*
+* @param dbNameName of the database
+* @return The requested database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   CatalogDatabase getDatabase(String dbName) throws 
DatabaseNotExistException;
+
+   /**
+* Check if a database exists in this catalog.
+*
+* @param dbNameName of the database
+*/
+   boolean databaseExists(String dbName);
+
+   /**
+* Gets paths of all tables and views under this database. An empty 
list is returned if none exists.
+*
+* @return A list of the names of all tables and views in this database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listTables(String dbName) throws DatabaseNotExistException;
 
 Review comment:
   Yes, but do you mind telling me the real reason why we do not need let user 
list all of the tables(not views) directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12106) Jobmanager is killing FINISHED taskmanger containers, causing exception in still running Taskmanagers an

2019-04-03 Thread John (JIRA)
John created FLINK-12106:


 Summary: Jobmanager is killing FINISHED taskmanger containers, 
causing exception in still running Taskmanagers an
 Key: FLINK-12106
 URL: https://issues.apache.org/jira/browse/FLINK-12106
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.7.2
 Environment: Hadoop:  hdp/2.5.6.0-40

Flink: 2.7.2
Reporter: John


When running a single flink job on YARN, some of the taskmanger containers 
reach the FINISHED state before others.  It appears that, after receiving final 
execution state FINISHED from a taskmanager, jobmanager is waiting ~68 seconds 
and then freeing the associated slot in the taskmanager.  After and additional 
60 seconds, jobmanager is stopping the same taskmanger because TaskExecutor 
exceeded the idle timeout.

Meanwhile, other taskmangers are still working to complete the job.  Within 10 
seconds after the taskmanger container above is stopped, the remaining task 
managers receive an exception due to loss of connection to the stopped 
taskmanager.  These exceptions result job failure.

 

Relevant logs:

2019-04-03 13:49:00,013 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Registering TaskManager with ResourceID 
container_1553017480503_0158_01_38 
(akka.tcp://flink@hadoop4:42745/user/taskmanager_0) at ResourceManager

2019-04-03 13:49:05,900 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Registering TaskManager with ResourceID 
container_1553017480503_0158_01_59 
(akka.tcp://flink@hadoop9:55042/user/taskmanager_0) at ResourceManager

 

 

2019-04-03 13:48:51,132 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Received new container: container_1553017480503_0158_01_77 - 
Remaining pending container requests: 6

2019-04-03 13:48:52,862 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
            -     
-Dlog.file=/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_77/taskmanager.log

2019-04-03 13:48:57,490 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
initialization (took 202 ms). Listening on SocketAddress /192.168.230.69:40140.

2019-04-03 13:49:12,575 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Registering TaskManager with ResourceID 
container_1553017480503_0158_01_77 
(akka.tcp://flink@hadoop9:51525/user/taskmanager_0) at ResourceManager

2019-04-03 13:49:12,631 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated slot 
for AllocationID\{42fed3e5a136240c23cc7b394e3249e9}.

2019-04-03 14:58:15,188 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering 
task and sending final execution state FINISHED to JobManager for task DataSink 
(com.anovadata.alexflinklib.sinks.bucketing.BucketingOutputFormat@26874f2c) 
a4b5fb32830d4561147b2714828109e2.

2019-04-03 14:59:23,049 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing idle 
slot [AllocationID\{42fed3e5a136240c23cc7b394e3249e9}].

2019-04-03 14:59:23,058 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile\{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
networkMemoryInMB=2147483647}, allocationId: 
AllocationID\{42fed3e5a136240c23cc7b394e3249e9}, jobId: 
a6c4e367698c15cdf168d19a89faff1d).

2019-04-03 15:00:02,641 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Stopping container container_1553017480503_0158_01_77.

2019-04-03 15:00:02,646 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Closing TaskExecutor connection 
container_1553017480503_0158_01_77 because: TaskExecutor exceeded the idle 
timeout.

 

 

2019-04-03 13:48:48,902 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
            -     
-Dlog.file=/data1/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_59/taskmanager.log

2019-04-03 14:59:24,677 INFO  
org.apache.parquet.hadoop.InternalParquetRecordWriter         - Flushing mem 
columnStore to file. allocated memory: 109479981

2019-04-03 15:00:05,696 INFO  
org.apache.parquet.hadoop.InternalParquetRecordWriter         - mem size 
135014409 > 134217728: flushing 1930100 records to disk.

2019-04-03 15:00:05,696 INFO  
org.apache.parquet.hadoop.InternalParquetRecordWriter         - Flushing mem 
columnStore to file. allocated memory: 102677684

2019-04-03 15:00:08,671 ERROR org.apache.flink.runtime.operators.BatchTask      
            - Error in task code:  CHAIN Partition -> FlatMap 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Lost connection to task manager 

[jira] [Commented] (FLINK-12104) Flink Kafka fails with Incompatible KafkaProducer version / NoSuchFieldException sequenceNumbers

2019-04-03 Thread Tim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809220#comment-16809220
 ] 

Tim commented on FLINK-12104:
-

I am sure I am, not intentionally - but due to other libs in the dependency 
chain.   I will try with flink-connector-kafka_2.11

Side note:  I'm guessing we use reflection (Class.getDeclaredField) to access 
these because they are private in the Kafka codebase, and accessing them 
through the public API was not feasible given what FlinkKafkaProducer was 
trying to do?   Asking because in that sense the build would have failed to 
compile.

> Flink Kafka fails with Incompatible KafkaProducer version / 
> NoSuchFieldException sequenceNumbers
> 
>
> Key: FLINK-12104
> URL: https://issues.apache.org/jira/browse/FLINK-12104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Tim
>Priority: Major
>
> FlinkKafkaProducer (in flink-connector-kafka-0.11) tries to access a field 
> named `sequenceNumbers` from the KafkaProducer's TransactionManager.  You can 
> find this line on the [master branch 
> here|[https://github.com/apache/flink/blob/d6be68670e661091d94a3c65a2704d52fc0e827c/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java#L197].]
>  
> {code:java}
> Object transactionManager = getValue(kafkaProducer, "transactionManager");
> ...
> Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
> {code}
>  
> However, the Kafka TransactionManager no longer has a "sequenceNumbers" 
> field.  This was changed back on 9/14/2017 (KAFKA-5494) in an effort to 
> support multiple inflight requests while still guaranteeing idempotence.  See 
> [commit diff 
> here|[https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630].]
> Subsequently when Flink tries to "recoverAndCommit" (see 
> FlinkKafkaProducer011) it fails with a "NoSuchFieldException: 
> sequenceNumbers", followed by a "Incompatible KafkaProducer version".
> Given that the KafkaProducer used is so old (this change was made almost two 
> years ago) are there any plans of upgrading?   Or - are there some known 
> compatibility issues that prevent Flink/Kafka connector from doing so?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271887584
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
 
 Review comment:
   Actually I removed renameDatabase() from catalog APIs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aloyszhang edited a comment on issue #8075: [FLINK-12043][core] Process unreadble NPE without any hint

2019-04-03 Thread GitBox
aloyszhang edited a comment on issue #8075: [FLINK-12043][core] Process 
unreadble NPE without any hint
URL: https://github.com/apache/flink/pull/8075#issuecomment-479601262
 
 
   I agree with "Even without them, when you get an NPE you can already infer 
that one of the incoming elements was likely null",  but anyway, I think it's 
more clear and easier to understand with these hints.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector

2019-04-03 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809002#comment-16809002
 ] 

Shuyi Chen commented on FLINK-11912:


Hi [~aitozi], the current approach does the following:
1) as the KafkaConsumer discover new partition, it add the partition 
information to _manualRegisteredMetricSet_. 
2) in the consumer polling run loop, for every iteration/poll, check if there 
is any partition not yet registered in _manualRegisteredMetricSet_. If there 
are still partitions left, check if the KafkaConsumer has already exposed the 
metric for those partitions, and register them with Flink. 

In short, the current approach will keep trying to register the partition 
metric once a new partition is discovered until the KafkaConsumer expose it. 
Therefore, I dont think we will lose partition lag metrics unless there are 
bugs with new partition discovery mechanism. What do you think?

> Expose per partition Kafka lag metric in Flink Kafka connector
> --
>
> Key: FLINK-11912
> URL: https://issues.apache.org/jira/browse/FLINK-11912
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.7.2
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> In production, it's important that we expose the Kafka lag by partition 
> metric in order for users to diagnose which Kafka partition is lagging. 
> However, although the Kafka lag by partition metrics are available in 
> KafkaConsumer after 0.10.2,  Flink was not able to properly register it 
> because the metrics are only available after the consumer start polling data 
> from partitions. I would suggest the following fix:
> 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet.
> 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add 
> MetricName for those partitions that we want to register into 
> manualRegisteredMetricSet. 
> 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, 
> try to search for the metrics available in KafkaConsumer, and if found, 
> register it and remove the entry from manualRegisteredMetricSet. 
> The overhead of the above approach is bounded and only incur when discovering 
> new partitions, and registration is done once the KafkaConsumer have the 
> metrics exposed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aloyszhang commented on issue #8075: [FLINK-12043][core] Process unreadble NPE without any hint

2019-04-03 Thread GitBox
aloyszhang commented on issue #8075: [FLINK-12043][core] Process unreadble NPE 
without any hint
URL: https://github.com/apache/flink/pull/8075#issuecomment-479601262
 
 
   I agree with "Even without them, when you get an NPE you can already infer 
that one of the incoming elements was likely null",  but anyway, I think it's 
more clear and easier to understand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12104) Flink Kafka fails with Incompatible KafkaProducer version / NoSuchFieldException sequenceNumbers

2019-04-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808986#comment-16808986
 ] 

Elias Levy commented on FLINK-12104:


The {{flink-connector-kafka-0.11}} depends on the Kafka 0.11.0.2 client, which 
does have that field. The field was only removed in 1.0.0. Are you overriding 
the Kafka client dependency? If so, that is your problem.


 If you want to use a newer Kafka client, use the universal Kafka connection 
({{flink-connector-kafka_2.11}}), which tracks the latest version of the Kafka 
client.

> Flink Kafka fails with Incompatible KafkaProducer version / 
> NoSuchFieldException sequenceNumbers
> 
>
> Key: FLINK-12104
> URL: https://issues.apache.org/jira/browse/FLINK-12104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Tim
>Priority: Major
>
> FlinkKafkaProducer (in flink-connector-kafka-0.11) tries to access a field 
> named `sequenceNumbers` from the KafkaProducer's TransactionManager.  You can 
> find this line on the [master branch 
> here|[https://github.com/apache/flink/blob/d6be68670e661091d94a3c65a2704d52fc0e827c/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java#L197].]
>  
> {code:java}
> Object transactionManager = getValue(kafkaProducer, "transactionManager");
> ...
> Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
> {code}
>  
> However, the Kafka TransactionManager no longer has a "sequenceNumbers" 
> field.  This was changed back on 9/14/2017 (KAFKA-5494) in an effort to 
> support multiple inflight requests while still guaranteeing idempotence.  See 
> [commit diff 
> here|[https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630].]
> Subsequently when Flink tries to "recoverAndCommit" (see 
> FlinkKafkaProducer011) it fails with a "NoSuchFieldException: 
> sequenceNumbers", followed by a "Incompatible KafkaProducer version".
> Given that the KafkaProducer used is so old (this change was made almost two 
> years ago) are there any plans of upgrading?   Or - are there some known 
> compatibility issues that prevent Flink/Kafka connector from doing so?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value

2019-04-03 Thread Vinod Mehra (JIRA)
Vinod Mehra created FLINK-12105:
---

 Summary: TUMBLE INTERVAL value errors out for 100 or more value
 Key: FLINK-12105
 URL: https://issues.apache.org/jira/browse/FLINK-12105
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.7.2
 Environment: [https://github.com/ververica/sql-training]
Reporter: Vinod Mehra


I ran into this while experimenting with different values at Lyft eng. However 
it is reproducible with [https://github.com/ververica/sql-training] as well. I 
showed this issue to the training instructors during flink-forward-19 and they 
asked me to file this bug.

The INTERVAL values work fine until 99. Errors after that:

*TUMBLE(rideTime, INTERVAL '100' SECOND)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of SECOND(2) field_

*TUMBLE(rideTime, INTERVAL '100' MINUTE)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of MINUTE(2) field_

*TUMBLE(rideTime, INTERVAL '100' HOUR)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of HOUR(2) field_

*TUMBLE(rideTime, INTERVAL '100' DAY)*

Interval field value 100 exceeds precision of DAY(2) field

(Note: MONTH AND YEAR also error out but for different reasons ("_Only constant 
window intervals with millisecond resolution are supported_"). MONTH and YEAR 
intervals are not supported at all currently. I was told that it is hard to 
implement because of timezone differences. I will file that separately.)_
_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value

2019-04-03 Thread Vinod Mehra (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinod Mehra updated FLINK-12105:

Description: 
I ran into this while experimenting with different values at Lyft eng. However 
it is reproducible with [https://github.com/ververica/sql-training] as well. I 
showed this issue to the training instructors during flink-forward-19 and they 
asked me to file this bug.

The INTERVAL values work fine until 99. Errors after that:

*TUMBLE(rideTime, INTERVAL '100' SECOND)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of SECOND(2) field_

*TUMBLE(rideTime, INTERVAL '100' MINUTE)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of MINUTE(2) field_

*TUMBLE(rideTime, INTERVAL '100' HOUR)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of HOUR(2) field_

*TUMBLE(rideTime, INTERVAL '100' DAY)*

_org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value 
100 exceeds precision of DAY(2) field

(Note: MONTH AND YEAR also error out but for different reasons ("_Only constant 
window intervals with millisecond resolution are supported_"). MONTH and YEAR 
intervals are not supported at all currently. I was told that it is hard to 
implement because of timezone differences. I will file that separately.)_
 _

  was:
I ran into this while experimenting with different values at Lyft eng. However 
it is reproducible with [https://github.com/ververica/sql-training] as well. I 
showed this issue to the training instructors during flink-forward-19 and they 
asked me to file this bug.

The INTERVAL values work fine until 99. Errors after that:

*TUMBLE(rideTime, INTERVAL '100' SECOND)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of SECOND(2) field_

*TUMBLE(rideTime, INTERVAL '100' MINUTE)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of MINUTE(2) field_

*TUMBLE(rideTime, INTERVAL '100' HOUR)*

_org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
100 exceeds precision of HOUR(2) field_

*TUMBLE(rideTime, INTERVAL '100' DAY)*

Interval field value 100 exceeds precision of DAY(2) field

(Note: MONTH AND YEAR also error out but for different reasons ("_Only constant 
window intervals with millisecond resolution are supported_"). MONTH and YEAR 
intervals are not supported at all currently. I was told that it is hard to 
implement because of timezone differences. I will file that separately.)_
_


> TUMBLE INTERVAL value errors out for 100 or more value
> --
>
> Key: FLINK-12105
> URL: https://issues.apache.org/jira/browse/FLINK-12105
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.7.2
> Environment: [https://github.com/ververica/sql-training]
>Reporter: Vinod Mehra
>Priority: Major
>
> I ran into this while experimenting with different values at Lyft eng. 
> However it is reproducible with [https://github.com/ververica/sql-training] 
> as well. I showed this issue to the training instructors during 
> flink-forward-19 and they asked me to file this bug.
> The INTERVAL values work fine until 99. Errors after that:
> *TUMBLE(rideTime, INTERVAL '100' SECOND)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of SECOND(2) field_
> *TUMBLE(rideTime, INTERVAL '100' MINUTE)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of MINUTE(2) field_
> *TUMBLE(rideTime, INTERVAL '100' HOUR)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of HOUR(2) field_
> *TUMBLE(rideTime, INTERVAL '100' DAY)*
> _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value 
> 100 exceeds precision of DAY(2) field
> (Note: MONTH AND YEAR also error out but for different reasons ("_Only 
> constant window intervals with millisecond resolution are supported_"). MONTH 
> and YEAR intervals are not supported at all currently. I was told that it is 
> hard to implement because of timezone differences. I will file that 
> separately.)_
>  _



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 edited a comment on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 edited a comment on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-479572670
 
 
   @dawidwys  Thanks a lot for your review. I think you have given a lot of 
good suggestions!
   
   1. As for the useless APIs in `TableEnvironment` interface, maybe we can 
deprecate first and remove them later. This can make our interface compatible. 
   
   2. To uncouple the `Descriptor` with `TableEnvironment`,  we can change 
registerTableSource to getTableSource in `RegistrableDescriptor`(also should 
rename `RegistrableDescriptor`). Once we get the table source or sink, then we 
can register the table source or sink with TableEnvironment. Although this 
makes users impossible to register table source(or sink) directly, I am ok with 
it.
   
   3. For the `TableFactoryUtil`, I am not sure if we can port it into api or 
common module(I will make sure later). However, I do find we can also uncouple 
TableEnvironment here. In the current implementation, it seems table env is 
only used to decide whether it is a batch or stream source(or sink). We can 
also use descriptor to decide, i.e., a StreamTableDescriptor means a stream 
source(or sink). 
   
   For 2 and 3, I think the biggest obstacle is we may need to keep backward 
compatible while converting TableEnvironment into an interface. @twalthr Would 
be great to also have your opinions here.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12104) Flink Kafka fails with Incompatible KafkaProducer version / NoSuchFieldException sequenceNumbers

2019-04-03 Thread Tim (JIRA)
Tim created FLINK-12104:
---

 Summary: Flink Kafka fails with Incompatible KafkaProducer version 
/ NoSuchFieldException sequenceNumbers
 Key: FLINK-12104
 URL: https://issues.apache.org/jira/browse/FLINK-12104
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.7.2
Reporter: Tim


FlinkKafkaProducer (in flink-connector-kafka-0.11) tries to access a field 
named `sequenceNumbers` from the KafkaProducer's TransactionManager.  You can 
find this line on the [master branch 
here|[https://github.com/apache/flink/blob/d6be68670e661091d94a3c65a2704d52fc0e827c/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java#L197].]

 
{code:java}
Object transactionManager = getValue(kafkaProducer, "transactionManager");
...
Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
{code}
 

However, the Kafka TransactionManager no longer has a "sequenceNumbers" field.  
This was changed back on 9/14/2017 (KAFKA-5494) in an effort to support 
multiple inflight requests while still guaranteeing idempotence.  See [commit 
diff 
here|[https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630].]

Subsequently when Flink tries to "recoverAndCommit" (see FlinkKafkaProducer011) 
it fails with a "NoSuchFieldException: sequenceNumbers", followed by a 
"Incompatible KafkaProducer version".

Given that the KafkaProducer used is so old (this change was made almost two 
years ago) are there any plans of upgrading?   Or - are there some known 
compatibility issues that prevent Flink/Kafka connector from doing so?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 edited a comment on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 edited a comment on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-479573907
 
 
   > Hi @hequn8128 Would you mind rebasing this commit on current master?
   
   Sure :-)
   I will do it ASAP while addressing the comments that are quite sure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 commented on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-479573907
 
 
   > Hi @hequn8128 Would you mind rebasing this commit on current master?
   
   Sure :-)
   I will do it ASAP while addressing the comments that are quit sure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on issue #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 commented on issue #8050: [FLINK-11067][table] Convert 
TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#issuecomment-479572670
 
 
   @dawidwys  Thanks a lot for your review. I think you have given a lot of 
good suggestions!
   
   1. As for the useless APIs in `TableEnvironment` interface, maybe we can 
deprecate first and remove them later. This can make our interface compitable. 
   
   2. To uncouple the `Descriptor` with `TableEnvironment`,  we can change 
registerTableSource to getTableSource in `RegistrableDescriptor`(also should 
rename `RegistrableDescriptor`). Once we get the table source or sink, then we 
can register the table source or sink with TableEnvironment. Althougn this 
makes users impossible to register table source(or sink) directly, I am ok with 
it.
   
   3. For the `TableFactoryUtil`, I am not sure if we can port it into api or 
common module(I will make sure later). However, I do find we can also uncouple 
TableEnvironment here. In the current implementation, it seems table env is 
only used to decide whether it is a batch or stream source(or sink). We can 
also use descriptor to decide, i.e., a StreamTableDescriptor means a stream 
source(or sink). 
   
   For 2 and 3, I think the biggest obstacle is we may need to fix the problem 
in a backwards compatible way. @twalthr Would be great to also have your 
opinions here.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12103) Extend builtin operator metrics with user defined scope and variables

2019-04-03 Thread Ben Marini (JIRA)
Ben Marini created FLINK-12103:
--

 Summary: Extend builtin operator metrics with user defined scope 
and variables
 Key: FLINK-12103
 URL: https://issues.apache.org/jira/browse/FLINK-12103
 Project: Flink
  Issue Type: Wish
  Components: Runtime / Metrics
Affects Versions: 1.7.2
Reporter: Ben Marini


I'm looking for a way to extend the builtin throughput and latency metrics for 
operators with my own metric variables.

My specific use case:

I have a job that defines a list of independent source -> sink streams. I would 
like to add my own metric variables to each of these independent streams. For 
example, something like this:

{code:scala}
class MyFilter extends RichFilterFunction {
  override def open(parameters: Configuration): Unit = {
    val mg = getRuntimeContext.getMetricGroup // Includes "streamName" -> "A|B"

    // Init some user defined metrics here...
  }
}

// Stream A
// Native operator metrics and user defined metrics in rich operators include 
"streamName" -> "A"
streamA = env.withMetricGroup((mg) => mg.addGroup("streamName", 
"A").addSource(...).filter(new MyFilter).addSink(...)

// Stream B
// Native operator metrics and user defined metrics in rich operators include 
"streamName" -> "B"

streamB = env.withMetricGroup((mg) => mg.addGroup("streamName", 
"B").addSource(...).filter(new MyFilter).addSink(...)
{code}
 
Is this possible? Would a new hook into StreamTransformation have to be added?




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271823199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   It looks like a future API point for ShuffleService to estimate the memory 
it needs in container.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271834864
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ##
 @@ -1045,7 +1045,7 @@ public void testTaskManagerServicesConfiguration() 
throws Exception {

config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
 
TaskManagerServicesConfiguration tmConfig =
-   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLoopbackAddress(), true);
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 128L * 1024 * 1024, 
InetAddress.getLoopbackAddress(), true);
 
 Review comment:
   `private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271829700
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
 
 Review comment:
   Could we break down this method a bit more?
   I mean adding functions:
   - getDataport()
   - calculateNumberOfNetworkBuffers
   - createNettyConfig


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271453248
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -56,64 +61,25 @@
 
private final TaskEventDispatcher taskEventDispatcher;
 
-   private final int partitionRequestInitialBackoff;
-
-   private final int partitionRequestMaxBackoff;
-
-   /** Number of network buffers to use for each outgoing/incoming channel 
(subpartition/input channel). */
 
 Review comment:
   let's keep these 2 doc comments for `networkBuffersPerChannel` and 
`extraNetworkBuffersPerGate` in `NetworkEnvironmentConfiguration`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271833815
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 ##
 @@ -39,78 +33,66 @@
 public class NetworkBufferCalculationTest extends TestLogger {
 
/**
-* Test for {@link 
TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration,
 long)}
-* using the same (manual) test cases as in {@link 
TaskManagerServicesTest#calculateHeapSizeMB()}.
+* Test for {@link 
NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory(Configuration, 
long)}
+* using the same (manual) test cases as in {@link 
NetworkEnvironmentConfigurationTest#calculateHeapSizeMB()}.
 */
@Test
-   public void calculateNetworkBufFromHeapSize() throws Exception {
-   TaskManagerServicesConfiguration tmConfig;
+   public void calculateNetworkBufFromHeapSize() {
+   Configuration config;
 
-   tmConfig = 
getTmConfig(Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
+   config = getConfig(
+   
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),

TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
-   0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
+   0.1f, 60L << 20, 1L << 30, false);
 
 Review comment:
   MemoryType.HEAP/OFF_HEAP looked a bit more readable.
   could we use `memType  == MemoryType.OFF_HEAP` in `getConfig(.., memType)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271457542
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   why is it `@Deprecated`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271820791
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271831002
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -115,6 +412,7 @@ public int hashCode() {
result = 31 * result + partitionRequestMaxBackoff;
result = 31 * result + networkBuffersPerChannel;
result = 31 * result + floatingNetworkBuffersPerGate;
+   result = 31 * result + (isCreditBased ? 1 : 0);
 
 Review comment:
   also `numNetworkBuffers`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271800591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   do we need `isCreditBased` 2 times, if it is available in 
`nettyConfig.isCreditBasedEnabled` as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271795236
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MathUtils;
+
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
+
+/**
+ * Utility class to extract related parameters from {@link Configuration} and 
to
+ * sanity check them.
+ */
+public class ConfigurationParserUtils {
+
+   /**
+* Parses the configuration to get the managed memory size and 
validates the value.
+*
+* @param configuration configuration object
+* @return managed memory size (in megabytes)
+*/
+   public static long getManagedMemorySize(Configuration configuration) {
+   long managedMemorySize;
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   try {
+   managedMemorySize = MemorySize.parse(
+   
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), 
MEGA_BYTES).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+   throw new IllegalConfigurationException("Could 
not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+   }
+   } else {
+   managedMemorySize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   }
+
+   checkConfigParameter(configuration.getString(
+   
TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
 || managedMemorySize > 0,
+   managedMemorySize, 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+   "MemoryManager needs at least one MB of memory. " +
+   "If you leave this config parameter empty, the 
system automatically pick a fraction of the available memory.");
+
+   return managedMemorySize;
+   }
+
+   /**
+* Parses the configuration to get the fraction of managed memory and 
validates the value.
+*
+* @param configuration configuration object
+* @return fraction of managed memory
+*/
+   public static float getManagedMemoryFraction(Configuration 
configuration) {
+   float managedMemoryFraction = 
configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+
+   checkConfigParameter(managedMemoryFraction > 0.0f && 
managedMemoryFraction < 1.0f, managedMemoryFraction,
+   TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+   "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
+
+   return managedMemoryFraction;
+   }
+
+   /**
+* Parses the configuration to get the type of memory.
+*
+* @param configuration configuration object
+* @return type of memory
+*/
+   public static MemoryType getMemoryType(Configuration configuration) {
+   // check whether we use heap or off-heap memory
+   final MemoryType memType;
+   if 
(configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+   memType = MemoryType.OFF_HEAP;
+   } else {
+   

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271827710
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ##
 @@ -132,27 +118,27 @@ public TaskManagerServicesConfiguration(
//  Getter/Setter
// 

 
-   public InetAddress getTaskManagerAddress() {
+   InetAddress getTaskManagerAddress() {
return taskManagerAddress;
}
 
public String[] getTmpDirPaths() {
return tmpDirPaths;
}
 
-   public String[] getLocalRecoveryStateRootDirectories() {
+   String[] getLocalRecoveryStateRootDirectories() {
return localRecoveryStateRootDirectories;
}
 
-   public boolean isLocalRecoveryEnabled() {
+   boolean isLocalRecoveryEnabled() {
return localRecoveryEnabled;
}
 
public NetworkEnvironmentConfiguration getNetworkConfig() {
return networkConfig;
}
 
-   public QueryableStateConfiguration getQueryableStateConfig() {
+   QueryableStateConfiguration getQueryableStateConfig() {
 
 Review comment:
   let's also annotate with `@Nullable` as private field `queryableStateConfig`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12082) Bump up the jython-standalone version

2019-04-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12082:
-
Component/s: Build System
 API / Python

> Bump up the jython-standalone version
> -
>
> Key: FLINK-12082
> URL: https://issues.apache.org/jira/browse/FLINK-12082
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Build System
>Affects Versions: 1.8.0
>Reporter: aloyszhang
>Assignee: aloyszhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> jyhont's CVE :
> h1. [CVE-2016-4000|https://www.cvedetails.com/cve/CVE-2016-4000/]
>  
> we can update to 2.7.1b3



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12082) Bump up the jython-standalone version

2019-04-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12082.

Resolution: Invalid

Not only was this CVE already addressed in 2.7.1, moving to 2.7.1.b3 would mean 
downgrading the dependency.

> Bump up the jython-standalone version
> -
>
> Key: FLINK-12082
> URL: https://issues.apache.org/jira/browse/FLINK-12082
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: aloyszhang
>Assignee: aloyszhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> jyhont's CVE :
> h1. [CVE-2016-4000|https://www.cvedetails.com/cve/CVE-2016-4000/]
>  
> we can update to 2.7.1b3



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol commented on issue #8100: [FLINK-12082] Bump up the jython-standalone version

2019-04-03 Thread GitBox
zentol commented on issue #8100: [FLINK-12082] Bump up the jython-standalone 
version
URL: https://github.com/apache/flink/pull/8100#issuecomment-479562002
 
 
   Not only was this CVE already addressed in `2.7.1`, moving to `2.7.1.b3` 
would mean _downgrading_ the dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol closed pull request #8100: [FLINK-12082] Bump up the jython-standalone version

2019-04-03 Thread GitBox
zentol closed pull request #8100: [FLINK-12082] Bump up the jython-standalone 
version
URL: https://github.com/apache/flink/pull/8100
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol closed pull request #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing

2019-04-03 Thread GitBox
zentol closed pull request #7861: [FLINK-11747] [ Connectors / ElasticSearch] 
Expose RestHighLevelClient to allow for custom sniffing
URL: https://github.com/apache/flink/pull/7861
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing

2019-04-03 Thread GitBox
zentol commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] 
Expose RestHighLevelClient to allow for custom sniffing
URL: https://github.com/apache/flink/pull/7861#issuecomment-479554960
 
 
   Further design discussions should be moved to the JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8075: [FLINK-12043][core] Process unreadble NPE without any hint

2019-04-03 Thread GitBox
zentol commented on issue #8075: [FLINK-12043][core] Process unreadble NPE 
without any hint
URL: https://github.com/apache/flink/pull/8075#issuecomment-479552080
 
 
   I'm not convinced that these exceptions provide any value. Even without 
them, when you get an NPE you can already infer that one of the incoming 
elements was likely null. The added exceptions don't help in this regard.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-03 Thread GitBox
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] 
Convert TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#discussion_r271810319
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+/**
+ * The base class for batch and stream TableEnvironments.
+ *
+ * The TableEnvironment is a central concept of the Table API and SQL 
integration. It is
+ * responsible for:
+ *
+ * 
+ * Registering a Table in the internal catalog
+ * Registering an external catalog
+ * Executing SQL queries
+ * Registering a user-defined (scalar, table, or aggregation) 
function
+ * Converting a DataStream or DataSet into a Table
+ * Holding a reference to an ExecutionEnvironment or 
StreamExecutionEnvironment
+ * 
+ */
+@PublicEvolving
+public interface TableEnvironment {
+
+   /**
+* Creates a table from a table source.
+*
+* @param source table source used as table
+*/
+   Table fromTableSource(TableSource source);
+
+   /**
+* Registers an {@link ExternalCatalog} under a unique name in the 
TableEnvironment's schema.
+* All tables registered in the {@link ExternalCatalog} can be accessed.
+*
+* @param nameThe name under which the externalCatalog will 
be registered
+* @param externalCatalog The externalCatalog to register
+*/
+   void registerExternalCatalog(String name, ExternalCatalog 
externalCatalog);
+
+   /**
+* Gets a registered {@link ExternalCatalog} by name.
+*
+* @param name The name to look up the {@link ExternalCatalog}
+* @return The {@link ExternalCatalog}
+*/
+   ExternalCatalog getRegisteredExternalCatalog(String name);
+
+   /**
+* Registers a {@link ScalarFunction} under a unique name. Replaces 
already existing
+* user-defined functions under this name.
+*/
+   void registerFunction(String name, ScalarFunction function);
+
+   /**
+* Registers a {@link Table} under a unique name in the 
TableEnvironment's catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param name The name under which the table will be registered.
+* @param table The table to register.
+*/
+   void registerTable(String name, Table table);
+
+   /**
+* Registers an external {@link TableSource} in this {@link 
TableEnvironment}'s catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param nameThe name under which the {@link TableSource} is 
registered.
+* @param tableSource The {@link TableSource} to register.
+*/
+   void registerTableSource(String name, TableSource tableSource);
+
+   /**
+* Registers an external {@link TableSink} with given field names and 
types in this
+* {@link TableEnvironment}'s catalog.
+* Registered sink tables can be referenced in SQL DML statements.
+*
+* @param name The name under which the {@link TableSink} is registered.
+* @param fieldNames The field names to register with the {@link 
TableSink}.
+* @param fieldTypes The field types to register with the {@link 
TableSink}.
+* @param tableSink The {@link TableSink} to register.
+*/
+   void registerTableSink(String name, String[] fieldNames, 
TypeInformation[] fieldTypes, TableSink tableSink);
+
+   /**
+* Registers an external {@link TableSink} with already 

[jira] [Created] (FLINK-12102) FlinkILoopTest fails on Java 9

2019-04-03 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-12102:


 Summary: FlinkILoopTest fails on Java 9
 Key: FLINK-12102
 URL: https://issues.apache.org/jira/browse/FLINK-12102
 Project: Flink
  Issue Type: Sub-task
  Components: Scala Shell
Reporter: Chesnay Schepler
 Fix For: 1.9.0


{code}
java.lang.NullPointerException
at 
org.apache.flink.api.java.FlinkILoopTest.testConfigurationForwarding(FlinkILoopTest.java:89)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12100) Kafka 0.10/0.11 tests fail on Java 9

2019-04-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12100:
-
Summary: Kafka 0.10/0.11 tests fail on Java 9  (was: Kafka 0.10 tests fail 
on Java 9)

> Kafka 0.10/0.11 tests fail on Java 9
> 
>
> Key: FLINK-12100
> URL: https://issues.apache.org/jira/browse/FLINK-12100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.9.0
>
>
> {code}
> java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter
>   at 
> kafka.utils.CoreUtils$.urlSafeBase64EncodeNoPadding(CoreUtils.scala:294)
>   at kafka.utils.CoreUtils$.generateUuidAsBase64(CoreUtils.scala:282)
>   at 
> kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:335)
>   at 
> kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:335)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> kafka.server.KafkaServer.getOrGenerateClusterId(KafkaServer.scala:335)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:190)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:430)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:256)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:137)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:100)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:92)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> Caused by: java.lang.ClassNotFoundException: javax.xml.bind.DatatypeConverter
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>   at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:185)
>   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:496)
>   ... 33 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >