[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544494#comment-16544494
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6323


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6323


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6201


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544478#comment-16544478
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202536081
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

I added some unit tests.


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-15 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8866.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.6.0:

9597248a41b34e126aac6a807651b1d376dc6de1 
[FLINK-8866] [table] Create unified interfaces to configure and instatiate 
TableSinks

abbb89059f2a83705f41e405da14073800fb1870
[FLINK-8866] [table] Merge table source/sink/format factories

09fbfdfa76b068fcc8de249fe7cdcd01fd1f350e
[FLINK-8866] [table] Move table type out of descriptors

0e5ac4d791a8e35b3e65836bd08c8f96fd900e0b
[FLINK-8866] [table] Make source/sink factories environment-dependent

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544495#comment-16544495
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6201


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-15 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-9850:
--

 Summary: Add a string to the print method to identify output for 
DataStream
 Key: FLINK-9850
 URL: https://issues.apache.org/jira/browse/FLINK-9850
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Reporter: Hequn Cheng


The output of the print method of {[DataSet}} allows the user to supply a 
String to identify the output(see 
[FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
{[DataStream}} doesn't support now. It is valuable to add this feature for 
{{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544452#comment-16544452
 ] 

ASF GitHub Bot commented on FLINK-9675:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
Thanks @yanghua I will push the rest of the code then and fix the travis 
error.


> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-15 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
Thanks @yanghua I will push the rest of the code then and fix the travis 
error.


---


[jira] [Created] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-15 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9852:
---

 Summary: Expose descriptor-based sink creation in table 
environments
 Key: FLINK-9852
 URL: https://issues.apache.org/jira/browse/FLINK-9852
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Currently, only a table source can be created using the unified table 
descriptors with {{tableEnv.from(...)}}. A similar approach should be supported 
for defining sinks or even both types at the same time.

I suggest the following syntax:
{code}
tableEnv.connect(Kafka(...)).registerSource("name")
tableEnv.connect(Kafka(...)).registerSink("name")
tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
{code}

A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6336: [FLINK-9630] [connector] Kafka09PartitionDiscovere...

2018-07-15 Thread ubyyj
GitHub user ubyyj opened a pull request:

https://github.com/apache/flink/pull/6336

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …

…leak on TopicAuthorizationException


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, 
if getAllPartitionsForTopics() get a TopicAuthorizationException.

## Brief change log
catch TopicAuthorizationException and close the kafkaConsumer in 
getAllPartitionsForTopics().

## Verifying this change
This change added tests and can be verified as follows:
 - *Manually verified the change by running job which consumes from an 
non-exist kafka topic, and verified the # of opened TCP connection and # file 
handle did not increase of the task manager process, The fix has beening 
running in our production for weeks now, without problem *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ubyyj/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6336.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6336


commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738
Author: yuanyoujun 
Date:   2018-07-15T13:07:49Z

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak 
on TopicAuthorizationException




---


[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9630:
--
Labels: pull-request-available  (was: )

> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
> Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>Reporter: Youjun Yuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that has free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snapshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not even 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the discovery 
> loop will clean itself up after it escapes             
> partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544603#comment-16544603
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6302
  
Thanks for this contribution, that's a valuable fix. I have a few thoughts 
and suggestions on how we might improve the feature a bit still:

  - Can we get id of the `commons-text` dependency? The fewer dependencies, 
the fewer possible problems for users due to dependency clashes. It seems a bit 
heavy to add a new library for just one random string generation.

  - The feature is configured through additional constructor parameters. I 
am wondering if we may want to move this to the `Configuration`. That would 
allow the "ops side of things" to configure this for a setup (setting entropy 
key and checkpoints directory) without needing everyone that writes a Flink 
program to be aware of this.

  - If I read the code correctly, the code logs warnings for every file in 
case the feature is not activated. That will probably confuse a lot of users 
and make them dig into whether they have a wrong setup, when they simply don't 
use this new feature.



> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for be...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6302
  
Thanks for this contribution, that's a valuable fix. I have a few thoughts 
and suggestions on how we might improve the feature a bit still:

  - Can we get id of the `commons-text` dependency? The fewer dependencies, 
the fewer possible problems for users due to dependency clashes. It seems a bit 
heavy to add a new library for just one random string generation.

  - The feature is configured through additional constructor parameters. I 
am wondering if we may want to move this to the `Configuration`. That would 
allow the "ops side of things" to configure this for a setup (setting entropy 
key and checkpoints directory) without needing everyone that writes a Flink 
program to be aware of this.

  - If I read the code correctly, the code logs warnings for every file in 
case the feature is not activated. That will probably confuse a lot of users 
and make them dig into whether they have a wrong setup, when they simply don't 
use this new feature.



---


[GitHub] flink pull request #5516: [FLINK-8544] [Kafka Connector] Handle null message...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5516


---


[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6109


---


[jira] [Commented] (FLINK-9483) "Building Flink" doc doesn't highlight quick build command

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544630#comment-16544630
 ] 

ASF GitHub Bot commented on FLINK-9483:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6109


> "Building Flink" doc doesn't highlight quick build command
> --
>
> Key: FLINK-9483
> URL: https://issues.apache.org/jira/browse/FLINK-9483
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.6.0
> Environment: see difference between red and blue parts
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: Screen Shot 2018-05-31 at 4.12.32 PM.png
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6260: [FLINK-9758] Fix ContinuousFileProcessingTest fail...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6260


---


[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log

2018-07-15 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6329
  
@dawidwys  can you review this PR?


---


[jira] [Commented] (FLINK-7251) Merge the flink-java8 project into flink-core

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544663#comment-16544663
 ] 

ASF GitHub Bot commented on FLINK-7251:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6120#discussion_r202553410
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac
// number of parameters the SAM of implemented 
interface has; the parameter indexing applies to this range
final int baseParametersLen = 
sam.getParameterTypes().length;
 
-   // executable references "this" implicitly
-   if (paramLen <= 0) {
-   // executable declaring class can also 
be a super class of the input type
-   // we only validate if the executable 
exists in input type
-   validateInputContainsExecutable(exec, 
inType);
-   }
-   else {
-   final Type input = 
TypeExtractionUtils.extractTypeFromLambda(
-   exec,
-   lambdaInputTypeArgumentIndices,
--- End diff --

Good point. I will remove it.


> Merge the flink-java8 project into flink-core
> -
>
> Key: FLINK-7251
> URL: https://issues.apache.org/jira/browse/FLINK-7251
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544671#comment-16544671
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553358
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
 ---
@@ -85,10 +85,10 @@ public int compare(T o1, T o2) {
return ((Comparable) o1).compareTo(o2);
}
 
-   // we catch this case before moving to more expensive tie 
breaks.
-   if (o1.equals(o2)) {
-   return 0;
-   }
+// // we catch this case before moving to more expensive tie 
breaks.
--- End diff --

I think this is some commented out code which should be removed.


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553358
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
 ---
@@ -85,10 +85,10 @@ public int compare(T o1, T o2) {
return ((Comparable) o1).compareTo(o2);
}
 
-   // we catch this case before moving to more expensive tie 
breaks.
-   if (o1.equals(o2)) {
-   return 0;
-   }
+// // we catch this case before moving to more expensive tie 
breaks.
--- End diff --

I think this is some commented out code which should be removed.


---


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544673#comment-16544673
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552616
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ *
--- End diff --

JavaDocs missing


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544675#comment-16544675
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553765
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -342,16 +379,20 @@ private void 
restorePartitionedState(Collection state) throws
for (StateMetaInfoSnapshot restoredMetaInfo : 
restoredMetaInfos) {

restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
-   StateTable stateTable = 
stateTables.get(restoredMetaInfo.getName());
+   StateSnapshotRestore snapshotRestore = 
registeredStates.get(restoredMetaInfo.getName());
 
//important: only create a new table we 
did not already create it previously
-   if (null == stateTable) {
+   if (null == snapshotRestore) {
 
-   
RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo =
-   new 
RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+   if 
(restoredMetaInfo.getBackendStateType() == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) {
+   
RegisteredKeyValueStateBackendMetaInfo 
registeredKeyedBackendStateMetaInfo =
+   new 
RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo);
 
-   stateTable = 
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
-   
stateTables.put(restoredMetaInfo.getName(), stateTable);
+   snapshotRestore = 
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
+   
registeredStates.put(restoredMetaInfo.getName(), snapshotRestore);
+   } else {
--- End diff --

Maybe check that `(restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE`


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544669#comment-16544669
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552563
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
 ---
@@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull 
DataOutputView dov, int keyGroupId)
}
}
 
+   public static  StateSnapshotKeyGroupReader 
createKeyGroupPartitionReader(
+   @Nonnull ElementReaderFunction readerFunction,
+   @Nonnull KeyGroupElementsConsumer elementConsumer) {
--- End diff --

Indenting these parameter one more level would help to distinguish the body 
from the parameter list.


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552563
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
 ---
@@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull 
DataOutputView dov, int keyGroupId)
}
}
 
+   public static  StateSnapshotKeyGroupReader 
createKeyGroupPartitionReader(
+   @Nonnull ElementReaderFunction readerFunction,
+   @Nonnull KeyGroupElementsConsumer elementConsumer) {
--- End diff --

Indenting these parameter one more level would help to distinguish the body 
from the parameter list.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
-   for (StateTable stateTable : stateTables.values()) {
-   sum += stateTable.size();
+   for (StateSnapshotRestore stateTable : 
registeredStates.values()) {
+   if (stateTable instanceof StateTable) {
+   sum += ((StateTable) 
stateTable).size();
+   }
--- End diff --

Why does the timers don't count for the total number of state entries?


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552705
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 ---
@@ -24,37 +24,46 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-public class RegisteredBroadcastBackendStateMetaInfo extends 
RegisteredStateMetaInfoBase {
+public class RegisteredBroadcastStateBackendMetaInfo extends 
RegisteredStateMetaInfoBase {
 
/** The mode how elements in this state are assigned to tasks during 
restore. */
+   @Nonnull
private final OperatorStateHandle.Mode assignmentMode;
 
/** The type serializer for the keys in the map state. */
+   @Nonnull
private final TypeSerializer keySerializer;
 
/** The type serializer for the values in the map state. */
+   @Nonnull
private final TypeSerializer valueSerializer;
 
-   public RegisteredBroadcastBackendStateMetaInfo(
-   final String name,
-   final OperatorStateHandle.Mode assignmentMode,
-   final TypeSerializer keySerializer,
-   final TypeSerializer valueSerializer) {
+   /** The precomputed immutable snapshot of this state */
+   @Nullable
+   private StateMetaInfoSnapshot precomputedSnapshot;
--- End diff --

nit: Maybe rename to `precomputedStateMetaInfoSnapshot`


---


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544672#comment-16544672
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
 ---
@@ -28,9 +28,22 @@
 @FunctionalInterface
 public interface KeyExtractorFunction {
 
+   KeyExtractorFunction> FOR_KEYED_OBJECTS = new 
KeyExtractorFunction>() {
+   @Nonnull
+   @Override
+   public Object extractKeyFromElement(@Nonnull Keyed element) {
+   return element.getKey();
+   }
+   };
--- End diff --

Could we move this extractor into its own `KeyedKeyExtractorFunction` 
singleton?


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 ---
@@ -63,54 +72,46 @@ public 
RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe
}
 
@SuppressWarnings("unchecked")
-   public RegisteredBroadcastBackendStateMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
+   public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
OperatorStateHandle.Mode.valueOf(

snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-   (TypeSerializer) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-   (TypeSerializer) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+   (TypeSerializer) Preconditions.checkNotNull(
+   
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+   (TypeSerializer) Preconditions.checkNotNull(
+   
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));

Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == 
snapshot.getBackendStateType());
}
 
/**
 * Creates a deep copy of the itself.
 */
-   public RegisteredBroadcastBackendStateMetaInfo deepCopy() {
-   return new RegisteredBroadcastBackendStateMetaInfo<>(this);
+   @Nonnull
+   public RegisteredBroadcastStateBackendMetaInfo deepCopy() {
+   return new RegisteredBroadcastStateBackendMetaInfo<>(this);
}
 
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
-   Map optionsMap = Collections.singletonMap(
-   
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-   assignmentMode.toString());
-   Map> serializerMap = new HashMap<>(2);
-   Map 
serializerConfigSnapshotsMap = new HashMap<>(2);
-   String keySerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
-   String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-   serializerMap.put(keySerializerKey, keySerializer.duplicate());
-   serializerConfigSnapshotsMap.put(keySerializerKey, 
keySerializer.snapshotConfiguration());
-   serializerMap.put(valueSerializerKey, 
valueSerializer.duplicate());
-   serializerConfigSnapshotsMap.put(valueSerializerKey, 
valueSerializer.snapshotConfiguration());
-
-   return new StateMetaInfoSnapshot(
-   name,
-   StateMetaInfoSnapshot.BackendStateType.BROADCAST,
-   optionsMap,
-   serializerConfigSnapshotsMap,
-   serializerMap);
+   if (precomputedSnapshot == null) {
+   precomputedSnapshot = precomputeSnapshot();
+   }
+   return precomputedSnapshot;
--- End diff --

As an easy fix, we could remove the `precomputedSnapshot` field and keep it 
like it was before that the snapshot was computed with every `snapshot` call.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553524
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -80,13 +82,57 @@ public E peek() {
 
@Override
public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+   bulkPollRelaxedOrder(canConsume, consumer);
+   } else {
+   bulkPollStrictOrder(canConsume, consumer);
+   }
+   }
+
+   private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   } else {
+   while (!orderedCache.isEmpty() && 
canConsume.test(orderedCache.peekFirst())) {
+   final E next = orderedCache.removeFirst();
+   orderedStore.remove(next);
+   consumer.accept(next);
+   }
+
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   }
+   }
+   }
+
+   private void bulkPollStrictOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
consumer.accept(element);
}
}
 
+   private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   try (CloseableIterator iterator = 
orderedStore.orderedIterator()) {
+   while (iterator.hasNext()) {
+   final E next = iterator.next();
+   if (canConsume.test(next)) {
+   orderedStore.remove(next);
+   consumer.accept(next);
+   } else {
+   orderedCache.add(next);
+   while (iterator.hasNext() && 
!orderedCache.isFull()) {
+   
orderedCache.add(iterator.next());
+   }
+   break;
+   }
+   }
+   } catch (Exception e) {
+   throw new FlinkRuntimeException("Exception while bulk 
polling store.", e);
--- End diff --

I would prefer throwing a checked exception here.


---


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544667#comment-16544667
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552705
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 ---
@@ -24,37 +24,46 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-public class RegisteredBroadcastBackendStateMetaInfo extends 
RegisteredStateMetaInfoBase {
+public class RegisteredBroadcastStateBackendMetaInfo extends 
RegisteredStateMetaInfoBase {
 
/** The mode how elements in this state are assigned to tasks during 
restore. */
+   @Nonnull
private final OperatorStateHandle.Mode assignmentMode;
 
/** The type serializer for the keys in the map state. */
+   @Nonnull
private final TypeSerializer keySerializer;
 
/** The type serializer for the values in the map state. */
+   @Nonnull
private final TypeSerializer valueSerializer;
 
-   public RegisteredBroadcastBackendStateMetaInfo(
-   final String name,
-   final OperatorStateHandle.Mode assignmentMode,
-   final TypeSerializer keySerializer,
-   final TypeSerializer valueSerializer) {
+   /** The precomputed immutable snapshot of this state */
+   @Nullable
+   private StateMetaInfoSnapshot precomputedSnapshot;
--- End diff --

nit: Maybe rename to `precomputedStateMetaInfoSnapshot`


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
 ---
@@ -60,36 +61,35 @@
 
/** Result of partitioning the snapshot by key-group. */
@Nullable
-   private KeyGroupPartitionedSnapshot partitionedSnapshot;
+   private StateKeyGroupWriter partitionedSnapshot;
--- End diff --

nit: rename field


---


[jira] [Commented] (FLINK-9853) add hex support in table api and sql

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544542#comment-16544542
 ] 

ASF GitHub Bot commented on FLINK-9853:
---

GitHub user xueyumusic opened a pull request:

https://github.com/apache/flink/pull/6337

[FLINK-9853][Tabel API & SQL] add HEX support 

## What is the purpose of the change
This PR propose to add HEX in table api and sql, the syntax like mysql, 
which could take int or string arguments. For a integer argument N, it returns 
a hexadecimal string representation of the value of N. For a string argument 
str, it returns a hexadecimal string representation of str where each byte of 
each character in str is converted to two hexadecimal digits. 

Syntax:

HEX(100) = 64

HEX('This is a test String.') = 
'546869732069732061207465737420537472696e672e'

## Brief change log
  - *The expressionDsl, scalarSqlFunctions and mathExpressions to add hex*
  - *The FunctionGenerator to support hex generator*

## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added ScalaFunctionTests tests for table api and sql expressions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xueyumusic/flink hex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6337


commit 8d3bef913bee1d4913ef3ae056e3d15d4cda2cec
Author: xueyu <278006819@...>
Date:   2018-07-15T12:01:15Z

hex support




> add hex support in table api and sql
> 
>
> Key: FLINK-9853
> URL: https://issues.apache.org/jira/browse/FLINK-9853
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> like in mysql, HEX could take int or string arguments, For a integer argument 
> N, it returns a hexadecimal string representation of the value of N. For a 
> string argument str, it returns a hexadecimal string representation of str 
> where each byte of each character in str is converted to two hexadecimal 
> digits. 
> Syntax:
> HEX(100) = 64
> HEX('This is a test String.') = '546869732069732061207465737420537472696e672e'
> See more: [link 
> MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6337: [FLINK-9853][Tabel API & SQL] add HEX support

2018-07-15 Thread xueyumusic
GitHub user xueyumusic opened a pull request:

https://github.com/apache/flink/pull/6337

[FLINK-9853][Tabel API & SQL] add HEX support 

## What is the purpose of the change
This PR propose to add HEX in table api and sql, the syntax like mysql, 
which could take int or string arguments. For a integer argument N, it returns 
a hexadecimal string representation of the value of N. For a string argument 
str, it returns a hexadecimal string representation of str where each byte of 
each character in str is converted to two hexadecimal digits. 

Syntax:

HEX(100) = 64

HEX('This is a test String.') = 
'546869732069732061207465737420537472696e672e'

## Brief change log
  - *The expressionDsl, scalarSqlFunctions and mathExpressions to add hex*
  - *The FunctionGenerator to support hex generator*

## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added ScalaFunctionTests tests for table api and sql expressions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xueyumusic/flink hex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6337


commit 8d3bef913bee1d4913ef3ae056e3d15d4cda2cec
Author: xueyu <278006819@...>
Date:   2018-07-15T12:01:15Z

hex support




---


[jira] [Updated] (FLINK-9853) add hex support in table api and sql

2018-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9853:
--
Labels: pull-request-available  (was: )

> add hex support in table api and sql
> 
>
> Key: FLINK-9853
> URL: https://issues.apache.org/jira/browse/FLINK-9853
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> like in mysql, HEX could take int or string arguments, For a integer argument 
> N, it returns a hexadecimal string representation of the value of N. For a 
> string argument str, it returns a hexadecimal string representation of str 
> where each byte of each character in str is converted to two hexadecimal 
> digits. 
> Syntax:
> HEX(100) = 64
> HEX('This is a test String.') = '546869732069732061207465737420537472696e672e'
> See more: [link 
> MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544541#comment-16544541
 ] 

ASF GitHub Bot commented on FLINK-9630:
---

GitHub user ubyyj opened a pull request:

https://github.com/apache/flink/pull/6336

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …

…leak on TopicAuthorizationException


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, 
if getAllPartitionsForTopics() get a TopicAuthorizationException.

## Brief change log
catch TopicAuthorizationException and close the kafkaConsumer in 
getAllPartitionsForTopics().

## Verifying this change
This change added tests and can be verified as follows:
 - *Manually verified the change by running job which consumes from an 
non-exist kafka topic, and verified the # of opened TCP connection and # file 
handle did not increase of the task manager process, The fix has beening 
running in our production for weeks now, without problem *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ubyyj/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6336.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6336


commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738
Author: yuanyoujun 
Date:   2018-07-15T13:07:49Z

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak 
on TopicAuthorizationException




> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
> Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>Reporter: Youjun Yuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that has free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snapshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not even 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the discovery 

[jira] [Commented] (FLINK-9841) Web UI only show partial taskmanager log

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544549#comment-16544549
 ] 

ASF GitHub Bot commented on FLINK-9841:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6329
  
@dawidwys  can you review this PR?


> Web UI only show partial taskmanager log 
> -
>
> Key: FLINK-9841
> URL: https://issues.apache.org/jira/browse/FLINK-9841
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
> Environment: env : Flink on YARN
> version : 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
>  
> In the web UI, we select a task manager and click the "log" tab, but the UI 
> only show the partial log (first part), can never update even if we click the 
> "refresh" button.
> However, the job manager is always OK.
> The reason is the resource be closed twice.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null

2018-07-15 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-8544.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

> JSONKeyValueDeserializationSchema throws NPE when message key is null
> -
>
> Key: FLINK-8544
> URL: https://issues.apache.org/jira/browse/FLINK-8544
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Bill Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key 
> without validation.
>  If a message with key == null is read, flink throws an NPE.
> {code:java}
>   @Override
>   public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
> topic, int partition, long offset) throws IOException {
>   if (mapper == null) {
>   mapper = new ObjectMapper();
>   }
>   ObjectNode node = mapper.createObjectNode();
>   node.set("key", mapper.readValue(messageKey, JsonNode.class)); 
> // messageKey is not validate against null.
>   node.set("value", mapper.readValue(message, JsonNode.class));
> {code}
> The fix is very straightforward.
> {code:java}
>   if (messageKey == null) {
>   node.set("key", null)
>   } else {
>   node.set("key", mapper.readValue(messageKey, 
> JsonNode.class));
>   }
> {code}
> If it is appreciated, I would send a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9483) "Building Flink" doc doesn't highlight quick build command

2018-07-15 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-9483.
-
Resolution: Fixed

> "Building Flink" doc doesn't highlight quick build command
> --
>
> Key: FLINK-9483
> URL: https://issues.apache.org/jira/browse/FLINK-9483
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.6.0
> Environment: see difference between red and blue parts
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: Screen Shot 2018-05-31 at 4.12.32 PM.png
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544654#comment-16544654
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6332


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9854) Allow passing multi-line input to SQL Client CLI

2018-07-15 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9854:
---

 Summary: Allow passing multi-line input to SQL Client CLI
 Key: FLINK-9854
 URL: https://issues.apache.org/jira/browse/FLINK-9854
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


We should support {{flink-cli < query01.sql}} or {{echo "INSERT INTO bar SELECT 
* FROM foo" | flink-cli}} for convenience. I'm not sure how well we support 
multilines and EOF right now. Currenlty, with the experimental {{-u}} flag the 
user also gets the correct error code after the submission, with {{flink-cli < 
query01.sql}} the CLI would either stay in interactive mode or always return 
success.

We should also discuss which statements are allowed. Actually, only DDL and 
{{INSERT INTO}} statements make sense so far.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553765
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -342,16 +379,20 @@ private void 
restorePartitionedState(Collection state) throws
for (StateMetaInfoSnapshot restoredMetaInfo : 
restoredMetaInfos) {

restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
-   StateTable stateTable = 
stateTables.get(restoredMetaInfo.getName());
+   StateSnapshotRestore snapshotRestore = 
registeredStates.get(restoredMetaInfo.getName());
 
//important: only create a new table we 
did not already create it previously
-   if (null == stateTable) {
+   if (null == snapshotRestore) {
 
-   
RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo =
-   new 
RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+   if 
(restoredMetaInfo.getBackendStateType() == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) {
+   
RegisteredKeyValueStateBackendMetaInfo 
registeredKeyedBackendStateMetaInfo =
+   new 
RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo);
 
-   stateTable = 
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
-   
stateTables.put(restoredMetaInfo.getName(), stateTable);
+   snapshotRestore = 
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
+   
registeredStates.put(restoredMetaInfo.getName(), snapshotRestore);
+   } else {
--- End diff --

Maybe check that `(restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE`


---


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544668#comment-16544668
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 ---
@@ -63,54 +72,46 @@ public 
RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe
}
 
@SuppressWarnings("unchecked")
-   public RegisteredBroadcastBackendStateMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
+   public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
OperatorStateHandle.Mode.valueOf(

snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-   (TypeSerializer) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-   (TypeSerializer) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+   (TypeSerializer) Preconditions.checkNotNull(
+   
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+   (TypeSerializer) Preconditions.checkNotNull(
+   
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));

Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == 
snapshot.getBackendStateType());
}
 
/**
 * Creates a deep copy of the itself.
 */
-   public RegisteredBroadcastBackendStateMetaInfo deepCopy() {
-   return new RegisteredBroadcastBackendStateMetaInfo<>(this);
+   @Nonnull
+   public RegisteredBroadcastStateBackendMetaInfo deepCopy() {
+   return new RegisteredBroadcastStateBackendMetaInfo<>(this);
}
 
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
-   Map optionsMap = Collections.singletonMap(
-   
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-   assignmentMode.toString());
-   Map> serializerMap = new HashMap<>(2);
-   Map 
serializerConfigSnapshotsMap = new HashMap<>(2);
-   String keySerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
-   String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-   serializerMap.put(keySerializerKey, keySerializer.duplicate());
-   serializerConfigSnapshotsMap.put(keySerializerKey, 
keySerializer.snapshotConfiguration());
-   serializerMap.put(valueSerializerKey, 
valueSerializer.duplicate());
-   serializerConfigSnapshotsMap.put(valueSerializerKey, 
valueSerializer.snapshotConfiguration());
-
-   return new StateMetaInfoSnapshot(
-   name,
-   StateMetaInfoSnapshot.BackendStateType.BROADCAST,
-   optionsMap,
-   serializerConfigSnapshotsMap,
-   serializerMap);
+   if (precomputedSnapshot == null) {
+   precomputedSnapshot = precomputeSnapshot();
+   }
+   return precomputedSnapshot;
--- End diff --

As an easy fix, we could remove the `precomputedSnapshot` field and keep it 
like it was before that the snapshot was computed with every `snapshot` call.


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent 

[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552616
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ *
--- End diff --

JavaDocs missing


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202552483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
 ---
@@ -28,9 +28,22 @@
 @FunctionalInterface
 public interface KeyExtractorFunction {
 
+   KeyExtractorFunction> FOR_KEYED_OBJECTS = new 
KeyExtractorFunction>() {
+   @Nonnull
+   @Override
+   public Object extractKeyFromElement(@Nonnull Keyed element) {
+   return element.getKey();
+   }
+   };
--- End diff --

Could we move this extractor into its own `KeyedKeyExtractorFunction` 
singleton?


---


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544676#comment-16544676
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553524
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -80,13 +82,57 @@ public E peek() {
 
@Override
public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+   bulkPollRelaxedOrder(canConsume, consumer);
+   } else {
+   bulkPollStrictOrder(canConsume, consumer);
+   }
+   }
+
+   private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   } else {
+   while (!orderedCache.isEmpty() && 
canConsume.test(orderedCache.peekFirst())) {
+   final E next = orderedCache.removeFirst();
+   orderedStore.remove(next);
+   consumer.accept(next);
+   }
+
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   }
+   }
+   }
+
+   private void bulkPollStrictOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
consumer.accept(element);
}
}
 
+   private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   try (CloseableIterator iterator = 
orderedStore.orderedIterator()) {
+   while (iterator.hasNext()) {
+   final E next = iterator.next();
+   if (canConsume.test(next)) {
+   orderedStore.remove(next);
+   consumer.accept(next);
+   } else {
+   orderedCache.add(next);
+   while (iterator.hasNext() && 
!orderedCache.isFull()) {
+   
orderedCache.add(iterator.next());
+   }
+   break;
+   }
+   }
+   } catch (Exception e) {
+   throw new FlinkRuntimeException("Exception while bulk 
polling store.", e);
--- End diff --

I would prefer throwing a checked exception here.


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544670#comment-16544670
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
 ---
@@ -60,36 +61,35 @@
 
/** Result of partitioning the snapshot by key-group. */
@Nullable
-   private KeyGroupPartitionedSnapshot partitionedSnapshot;
+   private StateKeyGroupWriter partitionedSnapshot;
--- End diff --

nit: rename field


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544674#comment-16544674
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202553887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
-   for (StateTable stateTable : stateTables.values()) {
-   sum += stateTable.size();
+   for (StateSnapshotRestore stateTable : 
registeredStates.values()) {
+   if (stateTable instanceof StateTable) {
+   sum += ((StateTable) 
stateTable).size();
+   }
--- End diff --

Why does the timers don't count for the total number of state entries?


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-15 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9850:
---

Assignee: vinoyang

> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6260: [FLINK-9758] Fix ContinuousFileProcessingTest failure due...

2018-07-15 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6260
  
+1, merging


---


[jira] [Commented] (FLINK-9758) ContinuousFileProcessingTest failed due to not setting runtimeContext

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544537#comment-16544537
 ] 

ASF GitHub Bot commented on FLINK-9758:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6260
  
+1, merging


> ContinuousFileProcessingTest failed due to not setting runtimeContext
> -
>
> Key: FLINK-9758
> URL: https://issues.apache.org/jira/browse/FLINK-9758
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Currently, ContinuousFileMonitoringFunction#open() method will print runtime 
> context if the log-level is Debug:
> {code:java}
> if (LOG.isDebugEnabled()) {
>LOG.debug("Opened {} (taskIdx= {}) for path: {}",
>   getClass().getSimpleName(), 
> getRuntimeContext().getIndexOfThisSubtask(), path);
> }
> {code}
> However, ContinuousFileProcessingTest did not set runtime context for 
> monitoringFunction, which will result in UT failure due to 
> IllegalStateException if log level is set to DEBUG
> {code:java}
> IllegalStateException("The runtime context has not been initialized."); 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8731) TwoInputStreamTaskTest flaky on travis

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544632#comment-16544632
 ] 

ASF GitHub Bot commented on FLINK-8731:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6338

[FLINK-8731] Replaced mockito with custom mock in TestInputChannel

Replaced questionable usage if Mockito with custom written mock. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink test-mock-lock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6338.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6338


commit a2c78e6193c57bb69fc6f7d7b629ef0ec47171bb
Author: Dawid Wysakowicz 
Date:   2018-07-12T14:17:48Z

[FLINK-8731] Replaced mockito with custom mock in TestInputChannel




> TwoInputStreamTaskTest flaky on travis
> --
>
> Key: FLINK-8731
> URL: https://issues.apache.org/jira/browse/FLINK-8731
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.5.2, 1.6.0
>
>
> https://travis-ci.org/zentol/flink/builds/344307861
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.479 sec <<< 
> FAILURE! - in org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest
> testOpenCloseAndTimestamps(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
>   Time elapsed: 0.05 sec  <<< ERROR!
> java.lang.Exception: error in task
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testOpenCloseAndTimestamps(TwoInputStreamTaskTest.java:99)
> Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: 
> Boolean cannot be returned by getChannelIndex()
> getChannelIndex() should return int
> ***
> If you're unsure why you're getting above error read on.
> Due to the nature of the syntax above problem might occur because:
> 1. This exception *might* occur in wrongly written multi-threaded tests.
>Please refer to Mockito FAQ on limitations of concurrency testing.
> 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub 
> spies - 
>- with doReturn|Throw() family of methods. More in javadocs for 
> Mockito.spy() method.
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:212)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:158)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:164)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9758) ContinuousFileProcessingTest failed due to not setting runtimeContext

2018-07-15 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-9758.
-
   Resolution: Fixed
Fix Version/s: (was: 1.5.2)
   1.6.0

> ContinuousFileProcessingTest failed due to not setting runtimeContext
> -
>
> Key: FLINK-9758
> URL: https://issues.apache.org/jira/browse/FLINK-9758
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Currently, ContinuousFileMonitoringFunction#open() method will print runtime 
> context if the log-level is Debug:
> {code:java}
> if (LOG.isDebugEnabled()) {
>LOG.debug("Opened {} (taskIdx= {}) for path: {}",
>   getClass().getSimpleName(), 
> getRuntimeContext().getIndexOfThisSubtask(), path);
> }
> {code}
> However, ContinuousFileProcessingTest did not set runtime context for 
> monitoringFunction, which will result in UT failure due to 
> IllegalStateException if log level is set to DEBUG
> {code:java}
> IllegalStateException("The runtime context has not been initialized."); 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...

2018-07-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6120#discussion_r202553410
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac
// number of parameters the SAM of implemented 
interface has; the parameter indexing applies to this range
final int baseParametersLen = 
sam.getParameterTypes().length;
 
-   // executable references "this" implicitly
-   if (paramLen <= 0) {
-   // executable declaring class can also 
be a super class of the input type
-   // we only validate if the executable 
exists in input type
-   validateInputContainsExecutable(exec, 
inType);
-   }
-   else {
-   final Type input = 
TypeExtractionUtils.extractTypeFromLambda(
-   exec,
-   lambdaInputTypeArgumentIndices,
--- End diff --

Good point. I will remove it.


---


[jira] [Created] (FLINK-9853) add hex support in table api and sql

2018-07-15 Thread xueyu (JIRA)
xueyu created FLINK-9853:


 Summary: add hex support in table api and sql
 Key: FLINK-9853
 URL: https://issues.apache.org/jira/browse/FLINK-9853
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: xueyu


like in mysql, HEX could take int or string arguments, For a integer argument 
N, it returns a hexadecimal string representation of the value of N. For a 
string argument str, it returns a hexadecimal string representation of str 
where each byte of each character in str is converted to two hexadecimal 
digits. 

Syntax:

HEX(100) = 64

HEX('This is a test String.') = '546869732069732061207465737420537472696e672e'

See more: [link 
MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-15 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
@tedyu Could you please take a look on this ? Thank you very much.


---


[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544544#comment-16544544
 ] 

ASF GitHub Bot commented on FLINK-9675:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
@tedyu Could you please take a look on this ? Thank you very much.


> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544554#comment-16544554
 ] 

ASF GitHub Bot commented on FLINK-9794:
---

Github user jrthe42 commented on the issue:

https://github.com/apache/flink/pull/6301
  
Hi @hequn8128, I agree with you that a connection pool is more effective 
using connection resource. I didn't choose connection pool because that will 
introduce new dependencies, and I'm not sure if that tradeoff is acceptable. I 
will check ```MiniConnectionPoolManager ``` to see if it's a better way.


> JDBCOutputFormat does not consider idle connection and multithreads 
> synchronization
> ---
>
> Key: FLINK-9794
> URL: https://issues.apache.org/jira/browse/FLINK-9794
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0, 1.5.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of  JDBCOutputFormat has two potential problems: 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connection lies idle for a long time, the 
> database will force close the connection, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> We need fix these two problems to make JDBCOutputFormat more reliable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

2018-07-15 Thread jrthe42
Github user jrthe42 commented on the issue:

https://github.com/apache/flink/pull/6301
  
Hi @hequn8128, I agree with you that a connection pool is more effective 
using connection resource. I didn't choose connection pool because that will 
introduce new dependencies, and I'm not sure if that tradeoff is acceptable. I 
will check ```MiniConnectionPoolManager ``` to see if it's a better way.


---


[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544599#comment-16544599
 ] 

ASF GitHub Bot commented on FLINK-9829:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6321
  
Is the issue addressed here a bug?
If not, and if it seems that the original authors of the code had an 
intention of writing the code as it is now, I would suggest to leave it as it 
is.


> The wrapper classes be compared by symbol of '==' directly in 
> BigDecSerializer.java
> ---
>
> Key: FLINK-9829
> URL: https://issues.apache.org/jira/browse/FLINK-9829
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> The wrapper classes should be compared by equals method rather than by symbol 
> of '==' directly in BigDecSerializer.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6321
  
Is the issue addressed here a bug?
If not, and if it seems that the original authors of the code had an 
intention of writing the code as it is now, I would suggest to leave it as it 
is.


---


[jira] [Commented] (FLINK-9121) Remove Flip-6 prefixes from code base

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544602#comment-16544602
 ] 

ASF GitHub Bot commented on FLINK-9121:
---

Github user Matrix42 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5801#discussion_r202548776
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
@@ -49,30 +54,45 @@ public YarnClusterDescriptor(
 
@Override
protected String getYarnSessionClusterEntrypoint() {
-   return YarnApplicationMasterRunner.class.getName();
+   return YarnSessionClusterEntrypoint.class.getName();
}
 
@Override
protected String getYarnJobClusterEntrypoint() {
-   throw new UnsupportedOperationException("The old Yarn 
descriptor does not support proper per-job mode.");
+   return YarnJobClusterEntrypoint.class.getName();
}
 
@Override
-   public YarnClusterClient deployJobCluster(
-   ClusterSpecification clusterSpecification,
-   JobGraph jobGraph,
-   boolean detached) {
-   throw new UnsupportedOperationException("Cannot deploy a 
per-job yarn cluster yet.");
+   public ClusterClient deployJobCluster(
+   ClusterSpecification clusterSpecification,
+   JobGraph jobGraph,
+   boolean detached) throws ClusterDeploymentException {
+
+   // this is required because the slots are allocated lazily
+   jobGraph.setAllowQueuedScheduling(true);
+
+   try {
+   return deployInternal(
+   clusterSpecification,
+   "Flink per-job cluster",
+   getYarnJobClusterEntrypoint(),
+   jobGraph,
+   detached);
+   } catch (Exception e) {
+   throw new ClusterDeploymentException("Could not deploy 
Yarn job cluster.", e);
+   }
}
 
@Override
-   protected ClusterClient 
createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int 
numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, 
Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
-   return new YarnClusterClient(
-   descriptor,
-   numberTaskManagers,
-   slotsPerTaskManager,
-   report,
+   protected ClusterClient createYarnClusterClient(
+   AbstractYarnClusterDescriptor descriptor,
+   int numberTaskManagers,
+   int slotsPerTaskManager,
+   ApplicationReport report,
+   Configuration flinkConfiguration,
+   boolean perJobCluster) throws Exception {
+   return new RestClusterClient<>(
--- End diff --

why don't return a YarnClusterClient here?


> Remove Flip-6 prefixes from code base
> -
>
> Key: FLINK-9121
> URL: https://issues.apache.org/jira/browse/FLINK-9121
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should remove all Flip-6 prefixes and other references from the code base 
> since it is not a special case but the new default architecture. Instead we 
> should prefix old code with legacy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...

2018-07-15 Thread Matrix42
Github user Matrix42 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5801#discussion_r202548776
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
@@ -49,30 +54,45 @@ public YarnClusterDescriptor(
 
@Override
protected String getYarnSessionClusterEntrypoint() {
-   return YarnApplicationMasterRunner.class.getName();
+   return YarnSessionClusterEntrypoint.class.getName();
}
 
@Override
protected String getYarnJobClusterEntrypoint() {
-   throw new UnsupportedOperationException("The old Yarn 
descriptor does not support proper per-job mode.");
+   return YarnJobClusterEntrypoint.class.getName();
}
 
@Override
-   public YarnClusterClient deployJobCluster(
-   ClusterSpecification clusterSpecification,
-   JobGraph jobGraph,
-   boolean detached) {
-   throw new UnsupportedOperationException("Cannot deploy a 
per-job yarn cluster yet.");
+   public ClusterClient deployJobCluster(
+   ClusterSpecification clusterSpecification,
+   JobGraph jobGraph,
+   boolean detached) throws ClusterDeploymentException {
+
+   // this is required because the slots are allocated lazily
+   jobGraph.setAllowQueuedScheduling(true);
+
+   try {
+   return deployInternal(
+   clusterSpecification,
+   "Flink per-job cluster",
+   getYarnJobClusterEntrypoint(),
+   jobGraph,
+   detached);
+   } catch (Exception e) {
+   throw new ClusterDeploymentException("Could not deploy 
Yarn job cluster.", e);
+   }
}
 
@Override
-   protected ClusterClient 
createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int 
numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, 
Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
-   return new YarnClusterClient(
-   descriptor,
-   numberTaskManagers,
-   slotsPerTaskManager,
-   report,
+   protected ClusterClient createYarnClusterClient(
+   AbstractYarnClusterDescriptor descriptor,
+   int numberTaskManagers,
+   int slotsPerTaskManager,
+   ApplicationReport report,
+   Configuration flinkConfiguration,
+   boolean perJobCluster) throws Exception {
+   return new RestClusterClient<>(
--- End diff --

why don't return a YarnClusterClient here?


---


[jira] [Commented] (FLINK-9404) Adapter viewfs in BucketingSink when verify that truncate actually works

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544611#comment-16544611
 ] 

ASF GitHub Bot commented on FLINK-9404:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6050


> Adapter viewfs in BucketingSink when verify that truncate actually works
> 
>
> Key: FLINK-9404
> URL: https://issues.apache.org/jira/browse/FLINK-9404
> Project: Flink
>  Issue Type: Bug
>Reporter: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> Hi:
>  
> when the viewfs function is enabled, the following operation will report 
> errors,
> "new Path(UUID.randomUUID().toString())"
> Hence, I think that we can add the bashPath, like this:
> "new Path(basePath, UUID.randomUUID().toString())"
>  
> thanks,
> Ma Qingxiang



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6326: Mutual authentication for internal communication

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6326
  
Pushed another commit that rebuilds the generated config docs


---


[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...

2018-07-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6120#discussion_r202552984
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac
// number of parameters the SAM of implemented 
interface has; the parameter indexing applies to this range
final int baseParametersLen = 
sam.getParameterTypes().length;
 
-   // executable references "this" implicitly
-   if (paramLen <= 0) {
--- End diff --

The input validation caused more errors than it solved. Especially with 
generic types. For lambdas this validation is limited anyway in a JDK compiler.


---


[jira] [Commented] (FLINK-7251) Merge the flink-java8 project into flink-core

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544657#comment-16544657
 ] 

ASF GitHub Bot commented on FLINK-7251:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6120#discussion_r202552984
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac
// number of parameters the SAM of implemented 
interface has; the parameter indexing applies to this range
final int baseParametersLen = 
sam.getParameterTypes().length;
 
-   // executable references "this" implicitly
-   if (paramLen <= 0) {
--- End diff --

The input validation caused more errors than it solved. Especially with 
generic types. For lambdas this validation is limited anyway in a JDK compiler.


> Merge the flink-java8 project into flink-core
> -
>
> Key: FLINK-7251
> URL: https://issues.apache.org/jira/browse/FLINK-7251
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544598#comment-16544598
 ] 

ASF GitHub Bot commented on FLINK-9829:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6321
  
This would be clearly not a hotfix. As per the pull request template, 
contributors should use hotfixes mainly for typos and JavaDoc updates.


> The wrapper classes be compared by symbol of '==' directly in 
> BigDecSerializer.java
> ---
>
> Key: FLINK-9829
> URL: https://issues.apache.org/jira/browse/FLINK-9829
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> The wrapper classes should be compared by equals method rather than by symbol 
> of '==' directly in BigDecSerializer.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6321
  
This would be clearly not a hotfix. As per the pull request template, 
contributors should use hotfixes mainly for typos and JavaDoc updates.


---


[GitHub] flink pull request #6050: [FLINK-9404][flink-connector-filesystem] Adapter v...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6050


---


[jira] [Commented] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544629#comment-16544629
 ] 

ASF GitHub Bot commented on FLINK-8544:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5516


> JSONKeyValueDeserializationSchema throws NPE when message key is null
> -
>
> Key: FLINK-8544
> URL: https://issues.apache.org/jira/browse/FLINK-8544
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Bill Lee
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key 
> without validation.
>  If a message with key == null is read, flink throws an NPE.
> {code:java}
>   @Override
>   public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
> topic, int partition, long offset) throws IOException {
>   if (mapper == null) {
>   mapper = new ObjectMapper();
>   }
>   ObjectNode node = mapper.createObjectNode();
>   node.set("key", mapper.readValue(messageKey, JsonNode.class)); 
> // messageKey is not validate against null.
>   node.set("value", mapper.readValue(message, JsonNode.class));
> {code}
> The fix is very straightforward.
> {code:java}
>   if (messageKey == null) {
>   node.set("key", null)
>   } else {
>   node.set("key", mapper.readValue(messageKey, 
> JsonNode.class));
>   }
> {code}
> If it is appreciated, I would send a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9758) ContinuousFileProcessingTest failed due to not setting runtimeContext

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544631#comment-16544631
 ] 

ASF GitHub Bot commented on FLINK-9758:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6260


> ContinuousFileProcessingTest failed due to not setting runtimeContext
> -
>
> Key: FLINK-9758
> URL: https://issues.apache.org/jira/browse/FLINK-9758
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Currently, ContinuousFileMonitoringFunction#open() method will print runtime 
> context if the log-level is Debug:
> {code:java}
> if (LOG.isDebugEnabled()) {
>LOG.debug("Opened {} (taskIdx= {}) for path: {}",
>   getClass().getSimpleName(), 
> getRuntimeContext().getIndexOfThisSubtask(), path);
> }
> {code}
> However, ContinuousFileProcessingTest did not set runtime context for 
> monitoringFunction, which will result in UT failure due to 
> IllegalStateException if log level is set to DEBUG
> {code:java}
> IllegalStateException("The runtime context has not been initialized."); 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8731) TwoInputStreamTaskTest flaky on travis

2018-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8731:
--
Labels: pull-request-available test-stability  (was: test-stability)

> TwoInputStreamTaskTest flaky on travis
> --
>
> Key: FLINK-8731
> URL: https://issues.apache.org/jira/browse/FLINK-8731
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.5.2, 1.6.0
>
>
> https://travis-ci.org/zentol/flink/builds/344307861
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.479 sec <<< 
> FAILURE! - in org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest
> testOpenCloseAndTimestamps(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
>   Time elapsed: 0.05 sec  <<< ERROR!
> java.lang.Exception: error in task
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testOpenCloseAndTimestamps(TwoInputStreamTaskTest.java:99)
> Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: 
> Boolean cannot be returned by getChannelIndex()
> getChannelIndex() should return int
> ***
> If you're unsure why you're getting above error read on.
> Due to the nature of the syntax above problem might occur because:
> 1. This exception *might* occur in wrongly written multi-threaded tests.
>Please refer to Mockito FAQ on limitations of concurrency testing.
> 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub 
> spies - 
>- with doReturn|Throw() family of methods. More in javadocs for 
> Mockito.spy() method.
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:212)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:158)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:164)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6338: [FLINK-8731] Replaced mockito with custom mock in ...

2018-07-15 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6338

[FLINK-8731] Replaced mockito with custom mock in TestInputChannel

Replaced questionable usage if Mockito with custom written mock. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink test-mock-lock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6338.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6338


commit a2c78e6193c57bb69fc6f7d7b629ef0ec47171bb
Author: Dawid Wysakowicz 
Date:   2018-07-12T14:17:48Z

[FLINK-8731] Replaced mockito with custom mock in TestInputChannel




---


[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544617#comment-16544617
 ] 

ASF GitHub Bot commented on FLINK-9675:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/6335
  
From https://travis-ci.org/apache/flink/jobs/404127448 :
```
Failed tests: 
  BlobServerPutTest.testPutBufferFailsIncomingForJob 
Expected: (an instance of java.io.IOException and exception with message a 
string containing " (Permission denied)")
 but: exception with message a string containing " (Permission denied)" 
message was 
"/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001"
Stacktrace was: java.nio.file.AccessDeniedException: 
/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at 
java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
at java.nio.file.Files.newOutputStream(Files.java:216)
at 
org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:594)
at 
org.apache.flink.runtime.blob.BlobServer.putTransient(BlobServer.java:542)
at 
org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:799)
at 
org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncoming(BlobServerPutTest.java:559)
at 
org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncomingForJob(BlobServerPutTest.java:516)
```
Please check the test failure.

Thanks


> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-15 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/6335
  
From https://travis-ci.org/apache/flink/jobs/404127448 :
```
Failed tests: 
  BlobServerPutTest.testPutBufferFailsIncomingForJob 
Expected: (an instance of java.io.IOException and exception with message a 
string containing " (Permission denied)")
 but: exception with message a string containing " (Permission denied)" 
message was 
"/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001"
Stacktrace was: java.nio.file.AccessDeniedException: 
/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at 
java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
at java.nio.file.Files.newOutputStream(Files.java:216)
at 
org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:594)
at 
org.apache.flink.runtime.blob.BlobServer.putTransient(BlobServer.java:542)
at 
org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:799)
at 
org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncoming(BlobServerPutTest.java:559)
at 
org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncomingForJob(BlobServerPutTest.java:516)
```
Please check the test failure.

Thanks


---


[jira] [Resolved] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-15 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8858.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.6.0: 695bc56a9e20b9d86eea14a02899b400d324a7ea

> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6332


---


[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...

2018-07-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6323
  
Thank @pnowojski. Merging this...


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544464#comment-16544464
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202535180
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],

[jira] [Created] (FLINK-9851) Add documentation for unified table sources/sinks

2018-07-15 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9851:
---

 Summary: Add documentation for unified table sources/sinks
 Key: FLINK-9851
 URL: https://issues.apache.org/jira/browse/FLINK-9851
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-8558 and FLINK-8866 reworked a lot of the existing table sources/sinks 
and the way they are discovered. We should rework the documentation about:

- Built-in sinks/source/formats and their properties for Table API and SQL 
Client
- How to write custom sinks/sources/formats
- Limitations such as {{property-version}}, {{rowtime.timestamps.from-source}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202535180
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544465#comment-16544465
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6323
  
Thank @pnowojski. Merging this...


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202536081
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

I added some unit tests.


---


[jira] [Resolved] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-15 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8558.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.6.0: ee40335ffa40fb32a692fa6be70946d9a70301b2

> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...

2018-07-15 Thread ubyyj
Github user ubyyj commented on the issue:

https://github.com/apache/flink/pull/6336
  
unrelavent build failure, close and will reopen again, to trigger travis 
build again


---


[GitHub] flink pull request #6336: [FLINK-9630] [connector] Kafka09PartitionDiscovere...

2018-07-15 Thread ubyyj
Github user ubyyj closed the pull request at:

https://github.com/apache/flink/pull/6336


---


[GitHub] flink pull request #6336: [FLINK-9630] [connector] Kafka09PartitionDiscovere...

2018-07-15 Thread ubyyj
GitHub user ubyyj reopened a pull request:

https://github.com/apache/flink/pull/6336

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …

…leak on TopicAuthorizationException


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, 
if getAllPartitionsForTopics() get a TopicAuthorizationException.

## Brief change log
catch TopicAuthorizationException and close the kafkaConsumer in 
getAllPartitionsForTopics().

## Verifying this change
This change added tests and can be verified as follows:
 - *Manually verified the change by running job which consumes from an 
non-exist kafka topic, and verified the # of opened TCP connection and # file 
handle did not increase of the task manager process, The fix has beening 
running in our production for weeks now, without problem *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ubyyj/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6336.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6336


commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738
Author: yuanyoujun 
Date:   2018-07-15T13:07:49Z

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak 
on TopicAuthorizationException




---


[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544720#comment-16544720
 ] 

ASF GitHub Bot commented on FLINK-9630:
---

Github user ubyyj closed the pull request at:

https://github.com/apache/flink/pull/6336


> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
> Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>Reporter: Youjun Yuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that has free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snapshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not even 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the discovery 
> loop will clean itself up after it escapes             
> partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...

2018-07-15 Thread ubyyj
Github user ubyyj commented on the issue:

https://github.com/apache/flink/pull/6336
  
trigger travis build


---


[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544721#comment-16544721
 ] 

ASF GitHub Bot commented on FLINK-9630:
---

Github user ubyyj commented on the issue:

https://github.com/apache/flink/pull/6336
  
unrelavent build failure, close and will reopen again, to trigger travis 
build again


> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
> Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>Reporter: Youjun Yuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that has free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snapshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not even 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the discovery 
> loop will clean itself up after it escapes             
> partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544722#comment-16544722
 ] 

ASF GitHub Bot commented on FLINK-9630:
---

GitHub user ubyyj reopened a pull request:

https://github.com/apache/flink/pull/6336

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …

…leak on TopicAuthorizationException


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, 
if getAllPartitionsForTopics() get a TopicAuthorizationException.

## Brief change log
catch TopicAuthorizationException and close the kafkaConsumer in 
getAllPartitionsForTopics().

## Verifying this change
This change added tests and can be verified as follows:
 - *Manually verified the change by running job which consumes from an 
non-exist kafka topic, and verified the # of opened TCP connection and # file 
handle did not increase of the task manager process, The fix has beening 
running in our production for weeks now, without problem *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ubyyj/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6336.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6336


commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738
Author: yuanyoujun 
Date:   2018-07-15T13:07:49Z

[FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak 
on TopicAuthorizationException




> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
> Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>Reporter: Youjun Yuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that has free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snapshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not even 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the 

[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544723#comment-16544723
 ] 

ASF GitHub Bot commented on FLINK-9630:
---

Github user ubyyj commented on the issue:

https://github.com/apache/flink/pull/6336
  
trigger travis build


> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
> Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>Reporter: Youjun Yuan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> when the Kafka topic got deleted, during task starting process, 
> Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
> getAllPartitionsForTopics(), and it get no chance to close the  
> kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
>  
> *this issue can bring down the whole Flink cluster*, because, in a default 
> setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
> schedule the job to any TaskManager that has free slot, and each attemp will 
> cause the TaskManager to leak a TCP connection, eventually almost every 
> TaskManager will run out of file handle, hence no taskmanger could make 
> snapshot, or accept new job. Effectly stops the whole cluster.
>  
> The leak happens when StreamTask.invoke() calls openAllOperators(), then 
> FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
> when kafkaConsumer.partitionsFor(topic) in 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
> *TopicAuthorizationException,* no one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of 
> each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), 
> however it does not close the kakfaConsumer in partitionDiscoverer, not even 
> invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was 
> null.
>  
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to 
> concurrent access;             // only wakeup the discoverer, the discovery 
> loop will clean itself up after it escapes             
> partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6321: [FLINK-9829] fix the wrapper classes be compared b...

2018-07-15 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/6321#discussion_r202566500
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
 ---
@@ -69,17 +69,17 @@ public void serialize(BigDecimal record, DataOutputView 
target) throws IOExcepti
}
// fast paths for 0, 1, 10
// only reference equality is checked because equals would be 
too expensive
--- End diff --

right, the second section code was not exposed anywhere, and just modify 
the first section code now


---


[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544745#comment-16544745
 ] 

ASF GitHub Bot commented on FLINK-9829:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/6321#discussion_r202566500
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
 ---
@@ -69,17 +69,17 @@ public void serialize(BigDecimal record, DataOutputView 
target) throws IOExcepti
}
// fast paths for 0, 1, 10
// only reference equality is checked because equals would be 
too expensive
--- End diff --

right, the second section code was not exposed anywhere, and just modify 
the first section code now


> The wrapper classes be compared by symbol of '==' directly in 
> BigDecSerializer.java
> ---
>
> Key: FLINK-9829
> URL: https://issues.apache.org/jira/browse/FLINK-9829
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> The wrapper classes should be compared by equals method rather than by symbol 
> of '==' directly in BigDecSerializer.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6326: Mutual authentication for internal communication

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6326
  
Thanks for the review and for merging. @NicoK has an end-to-end test for 
SSL PR already (#6327) which would be great to rebase on top of this change.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202556091
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
-   for (StateTable stateTable : stateTables.values()) {
-   sum += stateTable.size();
+   for (StateSnapshotRestore stateTable : 
registeredStates.values()) {
+   if (stateTable instanceof StateTable) {
+   sum += ((StateTable) 
stateTable).size();
+   }
--- End diff --

This method is only used for some tests, and to be on the safe side it 
probably only expected to count the keyed state and not some timers.


---


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544693#comment-16544693
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202556091
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
-   for (StateTable stateTable : stateTables.values()) {
-   sum += stateTable.size();
+   for (StateSnapshotRestore stateTable : 
registeredStates.values()) {
+   if (stateTable instanceof StateTable) {
+   sum += ((StateTable) 
stateTable).size();
+   }
--- End diff --

This method is only used for some tests, and to be on the safe side it 
probably only expected to count the keyed state and not some timers.


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202556109
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -80,13 +82,57 @@ public E peek() {
 
@Override
public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+   bulkPollRelaxedOrder(canConsume, consumer);
+   } else {
+   bulkPollStrictOrder(canConsume, consumer);
+   }
+   }
+
+   private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   } else {
+   while (!orderedCache.isEmpty() && 
canConsume.test(orderedCache.peekFirst())) {
+   final E next = orderedCache.removeFirst();
+   orderedStore.remove(next);
+   consumer.accept(next);
+   }
+
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   }
+   }
+   }
+
+   private void bulkPollStrictOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
consumer.accept(element);
}
}
 
+   private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   try (CloseableIterator iterator = 
orderedStore.orderedIterator()) {
+   while (iterator.hasNext()) {
+   final E next = iterator.next();
+   if (canConsume.test(next)) {
+   orderedStore.remove(next);
+   consumer.accept(next);
+   } else {
+   orderedCache.add(next);
+   while (iterator.hasNext() && 
!orderedCache.isFull()) {
+   
orderedCache.add(iterator.next());
+   }
+   break;
+   }
+   }
+   } catch (Exception e) {
+   throw new FlinkRuntimeException("Exception while bulk 
polling store.", e);
--- End diff --

Why would you prefer it? I think there is no better way that caller can 
handle problems in this call than failing the job (rocksdb problems)?


---


  1   2   >