[jira] [Commented] (FLINK-8560) Access to the current key in ProcessFunction after keyBy()

2018-02-12 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360414#comment-16360414
 ] 

Piotr Nowojski commented on FLINK-8560:
---

[~phoenixjiangnan] why would we have to keep all of the keys in the memory? Can 
not we have in KeyedProcessOperator:
{code:java}
@Override
public void onEventTime(InternalTimer timer) throws Exception 
{
   collector.setAbsoluteTimestamp(timer.getTimestamp());
   onTime(timer, TimeDomain.EVENT_TIME);
}

@Override
public void onProcessingTime(InternalTimer timer) throws 
Exception {
   collector.eraseTimestamp();
   onTime(timer, TimeDomain.PROCESSING_TIME);
}

private void onTime(InternalTimer timer, TimeDomain 
timeDomain) throws Exception 
{ 
  Supplier currentKeyAccessor = () -> (K) getCurrentKey();
  onTimerContext.reinitialize(currentKeyAccessor, timeDomain, timer);
  userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
  onTimerContext.reset(); 
}{code}
 

Key would be only accessed when a user specifically asks for it, thus it 
shouldn't cost us any thing.

 

 

 

> Access to the current key in ProcessFunction after keyBy()
> --
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Wish
>  Components: DataStream API
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Minor
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7715) Port JarRunHandler to new REST endpoint

2018-02-12 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-7715:
---

Assignee: Gary Yao  (was: Fang Yong)

> Port JarRunHandler to new REST endpoint
> ---
>
> Key: FLINK-7715
> URL: https://issues.apache.org/jira/browse/FLINK-7715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarRunHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7712) Port JarDeleteHandler to new REST endpoint

2018-02-12 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-7712:
---

Assignee: Gary Yao  (was: Fang Yong)

> Port JarDeleteHandler to new REST endpoint
> --
>
> Key: FLINK-7712
> URL: https://issues.apache.org/jira/browse/FLINK-7712
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarDeleteHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7714) Port JarPlanHandler to new REST endpoint

2018-02-12 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-7714:
---

Assignee: Gary Yao  (was: Fang Yong)

> Port JarPlanHandler to new REST endpoint
> 
>
> Key: FLINK-7714
> URL: https://issues.apache.org/jira/browse/FLINK-7714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarPlanHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5383: [hotfix][kafka-tests] Do not hide original exception in F...

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5383
  
Changes look good to me. Thanks for the fix @pnowojski. Merging this PR.


---


[jira] [Assigned] (FLINK-8343) Add support for job cluster deployment

2018-02-12 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-8343:


Assignee: Till Rohrmann

> Add support for job cluster deployment
> --
>
> Key: FLINK-8343
> URL: https://issues.apache.org/jira/browse/FLINK-8343
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The 
> difference is that we directly submit the job when we deploy the Flink 
> cluster instead of following a two step approach (first deployment and then 
> submission).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360470#comment-16360470
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

There is no other failure or anything suspicious in the logs, correct?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListene

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360477#comment-16360477
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

And was there a failure/restore cycle before that error popped up?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener li

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360480#comment-16360480
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

Also, I think it would help to have the stack trace at the logging statements 
that you added, i.e. add a {{ExceptionUtils.getStackTrace(new Exception())}} in 
there.

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.f

[GitHub] flink pull request #5458: [FLINK-8212] [network] Pull EnvironmentInformation...

2018-02-12 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5458

[FLINK-8212] [network] Pull EnvironmentInformation out of TaskManager…

## What is the purpose of the change
Pull EnvironmentInformation out of TaskManagerServices


## Brief change log
Add inner class ```TaskExecutorEnvironmentInformation``` that can give 
access to the execution environment of the JVM.

## Verifying this change
Run ```NetworkBufferCalculationTest``` is OKay.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no)
  - The runtime per-record code paths (performance sensitive): (  don't 
know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8212

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5458


commit a4792758d3620359f9f42c3bcffbe7578bba4048
Author: zhangminglei 
Date:   2018-02-12T09:21:42Z

[FLINK-8212] [network] Pull EnvironmentInformation out of 
TaskManagerServices




---


[jira] [Commented] (FLINK-8212) Pull EnvironmentInformation out of TaskManagerServices

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360482#comment-16360482
 ] 

ASF GitHub Bot commented on FLINK-8212:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5458

[FLINK-8212] [network] Pull EnvironmentInformation out of TaskManager…

## What is the purpose of the change
Pull EnvironmentInformation out of TaskManagerServices


## Brief change log
Add inner class ```TaskExecutorEnvironmentInformation``` that can give 
access to the execution environment of the JVM.

## Verifying this change
Run ```NetworkBufferCalculationTest``` is OKay.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no)
  - The runtime per-record code paths (performance sensitive): (  don't 
know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8212

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5458


commit a4792758d3620359f9f42c3bcffbe7578bba4048
Author: zhangminglei 
Date:   2018-02-12T09:21:42Z

[FLINK-8212] [network] Pull EnvironmentInformation out of 
TaskManagerServices




> Pull EnvironmentInformation out of TaskManagerServices
> --
>
> Key: FLINK-8212
> URL: https://issues.apache.org/jira/browse/FLINK-8212
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Network
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the {{EnvironmentInformation}} out of the 
> {{TaskManagerServices}} where it is used to get access to the memory settings 
> of the executing JVM. This unnecessarily couples the former with the latter 
> and makes testing extremely hard (one has to use {{PowerMockRunner}} and mock 
> the static {{EnvironmentInformation}}).
> When addressing this issue, then we should also refactor 
> {{NetworkBufferCalculationTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167501690
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -143,7 +143,7 @@
 
private final StackTraceSampleCoordinator stackTraceSamples;
 
-   private final BackPressureStatsTracker backPressureStatsTracker;
+   private final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
--- End diff --

True, this is changed in #5457.


---


[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167501726
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
 ---
@@ -57,7 +57,7 @@ public void testGetPaths() {
@Test
public void testResponseNoStatsAvailable() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-   BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+   BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
--- End diff --

Will change it.


---


[jira] [Commented] (FLINK-8626) Introduce BackPressureStatsTracker interface

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360492#comment-16360492
 ] 

ASF GitHub Bot commented on FLINK-8626:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167501690
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -143,7 +143,7 @@
 
private final StackTraceSampleCoordinator stackTraceSamples;
 
-   private final BackPressureStatsTracker backPressureStatsTracker;
+   private final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
--- End diff --

True, this is changed in #5457.


> Introduce BackPressureStatsTracker interface
> 
>
> Key: FLINK-8626
> URL: https://issues.apache.org/jira/browse/FLINK-8626
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.5.0
>
>
> In order to better test components like the {{JobMaster}} we should introduce 
> a {{BackPressureStatsTracker}} interface and rename the current 
> {{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. 
> This will simplify testing where we have to set up all these things.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8626) Introduce BackPressureStatsTracker interface

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360493#comment-16360493
 ] 

ASF GitHub Bot commented on FLINK-8626:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167501726
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
 ---
@@ -57,7 +57,7 @@ public void testGetPaths() {
@Test
public void testResponseNoStatsAvailable() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-   BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+   BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
--- End diff --

Will change it.


> Introduce BackPressureStatsTracker interface
> 
>
> Key: FLINK-8626
> URL: https://issues.apache.org/jira/browse/FLINK-8626
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.5.0
>
>
> In order to better test components like the {{JobMaster}} we should introduce 
> a {{BackPressureStatsTracker}} interface and rename the current 
> {{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. 
> This will simplify testing where we have to set up all these things.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167502561
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
 ---
@@ -57,7 +57,7 @@ public void testGetPaths() {
@Test
public void testResponseNoStatsAvailable() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-   BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+   BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
--- End diff --

No unfortunately not, because the `JobVertexBackPressureHandler` requires 
`BackPresureStatsTrackerImpl` because of some methods which are not part of the 
interface.


---


[jira] [Commented] (FLINK-8626) Introduce BackPressureStatsTracker interface

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360501#comment-16360501
 ] 

ASF GitHub Bot commented on FLINK-8626:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167502561
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
 ---
@@ -57,7 +57,7 @@ public void testGetPaths() {
@Test
public void testResponseNoStatsAvailable() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-   BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+   BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
--- End diff --

No unfortunately not, because the `JobVertexBackPressureHandler` requires 
`BackPresureStatsTrackerImpl` because of some methods which are not part of the 
interface.


> Introduce BackPressureStatsTracker interface
> 
>
> Key: FLINK-8626
> URL: https://issues.apache.org/jira/browse/FLINK-8626
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.5.0
>
>
> In order to better test components like the {{JobMaster}} we should introduce 
> a {{BackPressureStatsTracker}} interface and rename the current 
> {{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. 
> This will simplify testing where we have to set up all these things.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5438: [FLINK-8617][TableAPI & SQL] Fix code generation b...

2018-02-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5438#discussion_r167503729
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -351,7 +351,7 @@ object ScalarOperators {
 else {
   s"""
 |${operand.code}
-|boolean $resultTerm = false;
+|boolean $resultTerm = ${operand.resultTerm} == null;
--- End diff --

This is not a bug but intended. The result type should be a primitive at 
this point. I remove it and all tests are still working correctly.


---


[jira] [Commented] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360506#comment-16360506
 ] 

ASF GitHub Bot commented on FLINK-8617:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5438#discussion_r167503729
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -351,7 +351,7 @@ object ScalarOperators {
 else {
   s"""
 |${operand.code}
-|boolean $resultTerm = false;
+|boolean $resultTerm = ${operand.resultTerm} == null;
--- End diff --

This is not a bug but intended. The result type should be a primitive at 
this point. I remove it and all tests are still working correctly.


> Fix code generation bug while accessing Map type
> 
>
> Key: FLINK-8617
> URL: https://issues.apache.org/jira/browse/FLINK-8617
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
> And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
> and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5438: [FLINK-8617][TableAPI & SQL] Fix code generation b...

2018-02-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5438


---


[jira] [Commented] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360511#comment-16360511
 ] 

ASF GitHub Bot commented on FLINK-8617:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5438


> Fix code generation bug while accessing Map type
> 
>
> Key: FLINK-8617
> URL: https://issues.apache.org/jira/browse/FLINK-8617
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
> And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
> and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-12 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8617.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5.0: f2ae2414d65ab2fbfa1efd60f69e243fbeeba118

> Fix code generation bug while accessing Map type
> 
>
> Key: FLINK-8617
> URL: https://issues.apache.org/jira/browse/FLINK-8617
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
> Fix For: 1.5.0
>
>
> There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
> And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
> and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8625) Move OutputFlusher thread to Netty scheduled executor

2018-02-12 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360533#comment-16360533
 ] 

mingleizhang commented on FLINK-8625:
-

Thanks [~pnowojski] We can improve the performance and reduce the resources 
usage with executor, 

> Move OutputFlusher thread to Netty scheduled executor
> -
>
> Key: FLINK-8625
> URL: https://issues.apache.org/jira/browse/FLINK-8625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Piotr Nowojski
>Priority: Major
>
> This will allow us to trigger/schedule next flush only if we are not 
> currently busy. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360559#comment-16360559
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr] and [~fhueske], I committed a WIP branch to my repository 
([xccui/FLINK-8538|https://github.com/xccui/flink/tree/FLINK-8538]).

A temporary new module {{flink-kafka}} was created for this work since I still 
can't think out how to consolidate the codes - -!. There are still some 
features need to be added, e.g., to support time attributes and schema mapping. 
Considering that the framework has already been constructed, I wonder if you 
could have a look to see if I'm on the right direction.

Thanks, Xingcan

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5763) Make savepoints self-contained and relocatable

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5763:

Fix Version/s: (was: 1.5.0)
   1.6.0

> Make savepoints self-contained and relocatable
> --
>
> Key: FLINK-5763
> URL: https://issues.apache.org/jira/browse/FLINK-5763
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.6.0
>
>
> After a user has triggered a savepoint, a single savepoint file will be 
> returned as a handle to the savepoint. A savepoint to {{}} creates a 
> savepoint file like {{/savepoint-}}.
> This file contains the metadata of the corresponding checkpoint, but not the 
> actual program state. While this works well for short term management 
> (pause-and-resume a job), it makes it hard to manage savepoints over longer 
> periods of time.
> h4. Problems
> h5. Scattered Checkpoint Files
> For file system based checkpoints (FsStateBackend, RocksDBStateBackend) this 
> results in the savepoint referencing files from the checkpoint directory 
> (usually different than ). For users, it is virtually impossible to 
> tell which checkpoint files belong to a savepoint and which are lingering 
> around. This can easily lead to accidentally invalidating a savepoint by 
> deleting checkpoint files.
> h5. Savepoints Not Relocatable
> Even if a user is able to figure out which checkpoint files belong to a 
> savepoint, moving these files will invalidate the savepoint as well, because 
> the metadata file references absolute file paths.
> h5. Forced to Use CLI for Disposal
> Because of the scattered files, the user is in practice forced to use Flink’s 
> CLI to dispose a savepoint. This should be possible to handle in the scope of 
> the user’s environment via a file system delete operation.
> h4. Proposal
> In order to solve the described problems, savepoints should contain all their 
> state, both metadata and program state, inside a single directory. 
> Furthermore the metadata must only hold relative references to the checkpoint 
> files. This makes it obvious which files make up the state of a savepoint and 
> it is possible to move savepoints around by moving the savepoint directory.
> h5. Desired File Layout
> Triggering a savepoint to {{}} creates a directory as follows:
> {code}
> /savepoint--
>   +-- _metadata
>   +-- data- [1 or more]
> {code}
> We include the JobID in the savepoint directory name in order to give some 
> hints about which job a savepoint belongs to.
> h5. CLI
> - Trigger: When triggering a savepoint to {{}} the savepoint 
> directory will be returned as the handle to the savepoint.
> - Restore: Users can restore by pointing to the directory or the _metadata 
> file. The data files should be required to be in the same directory as the 
> _metadata file.
> - Dispose: The disposal command should be deprecated and eventually removed. 
> While deprecated, disposal can happen by specifying the directory or the 
> _metadata file (same as restore).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-12 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360573#comment-16360573
 ] 

Till Rohrmann commented on FLINK-8459:
--

Hi [~yanghua], thanks for working on this issue? How do you wanna solve it? We 
have thought about a simple solution where the cancel with savepoint would 
effectively consist of two calls from the client. One for taking the savepoint 
and the other one for cancelling the job.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8212) Pull EnvironmentInformation out of TaskManagerServices

2018-02-12 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360577#comment-16360577
 ] 

Till Rohrmann commented on FLINK-8212:
--

Hi [~mingleizhang], I think the proper way to solve this problem is to pass in 
the required information to the {{fromConfiguration}} method.

> Pull EnvironmentInformation out of TaskManagerServices
> --
>
> Key: FLINK-8212
> URL: https://issues.apache.org/jira/browse/FLINK-8212
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Network
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the {{EnvironmentInformation}} out of the 
> {{TaskManagerServices}} where it is used to get access to the memory settings 
> of the executing JVM. This unnecessarily couples the former with the latter 
> and makes testing extremely hard (one has to use {{PowerMockRunner}} and mock 
> the static {{EnvironmentInformation}}).
> When addressing this issue, then we should also refactor 
> {{NetworkBufferCalculationTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8623) ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8623:

Priority: Blocker  (was: Critical)

> ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on 
> Travis
> 
>
> Key: FLINK-8623
> URL: https://issues.apache.org/jira/browse/FLINK-8623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics}} fails on 
> Travis: https://travis-ci.org/apache/flink/jobs/33932



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360606#comment-16360606
 ] 

Timo Walther commented on FLINK-8538:
-

Thanks for working on this [~xccui]. I had a quick look at it. Yes, I think you 
are on the right direction. I think we don't need a new module. We can also add 
this to {{flink-kafka-base}} but the question is if we want one descriptor for 
all Kafka connectors or make them version specific. Actually I'm in favor of 
just one descriptor but since connector interfaces change from version to 
version, it might be better to have {{Kafka010}} as a descriptor. What do you 
think [~fhueske]? I started working on FLINK-8558. Are you planning to work on 
FLINK-8630? Otherwise I would assign it to me. Would be great if we can get 
this in for Flink 1.5.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8621:

Priority: Blocker  (was: Critical)

> PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
> Travis
> 
>
> Key: FLINK-8621
> URL: https://issues.apache.org/jira/browse/FLINK-8621
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails 
> on Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8614) Enable Flip-6 per default

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8614:

Priority: Blocker  (was: Critical)

> Enable Flip-6 per default
> -
>
> Key: FLINK-8614
> URL: https://issues.apache.org/jira/browse/FLINK-8614
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding the FLINK-8471, the next step is to enable Flip-6 per default by 
> setting the configuration switch to {{flip6}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167523095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
--- End diff --

What happens with this decoder, if the request is not a multi part request? 
Wouldn't it be better to use `HttpPostRequestDecoder#isMultipart(HttpRequest 
request)`?


---


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167525118
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Could return a `Collection` instead of `List`


---


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167523546
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   if (httpPostRequestDecoder.isMultipart()) {
+   currentHttpPostRequestDecoder = 
httpPostRequestDecoder;
+   currentHttpRequest = httpRequest;
+   } else {
+   ctx.fireChannelRead(msg);
--- End diff --

How do we handle non multipart uploads? Do they exist?


---


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167525322
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
--- End diff --

This could be a separate method since we use this in `tryLoadWebContent` as 
well.


---


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167520629
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
+   } catch (IOException e) {
--- End diff --

Should we try to delete the uploaded file in case of this error?


---


[jira] [Commented] (FLINK-8625) Move OutputFlusher thread to Netty scheduled executor

2018-02-12 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360611#comment-16360611
 ] 

Piotr Nowojski commented on FLINK-8625:
---

I have found one more thing. After fixing the current performance bottlenecks 
in https://issues.apache.org/jira/browse/FLINK-8581 , currently GC pressure 
caused by OutputFlasher is our biggest performance bottleneck/issue. 
OutputFlasher executed once per 1ms for 1000 output channels enqueue every 1ms 
1000 elements on a internal Netty's executor. I presume those objects are 
pilling up and ending up in old GC generation.

This GC pressure is causing huge throughput fluctuations (because of long GC 
pauses) between 20,000 records/ms down to 160 records/ms. Those long GC pauses 
are quite dangerous, since they can cause Jobs failure.

> Move OutputFlusher thread to Netty scheduled executor
> -
>
> Key: FLINK-8625
> URL: https://issues.apache.org/jira/browse/FLINK-8625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Piotr Nowojski
>Priority: Major
>
> This will allow us to trigger/schedule next flush only if we are not 
> currently busy. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167522151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
--- End diff --

This is a bit ugly how Netty did it here...


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360613#comment-16360613
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167525322
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
--- End diff --

This could be a separate method since we use this in `tryLoadWebContent` as 
well.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360616#comment-16360616
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167520629
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
+   } catch (IOException e) {
--- End diff --

Should we try to delete the uploaded file in case of this error?


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360617#comment-16360617
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167523095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
--- End diff --

What happens with this decoder, if the request is not a multi part request? 
Wouldn't it be better to use `HttpPostRequestDecoder#isMultipart(HttpRequest 
request)`?


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
>

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360614#comment-16360614
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167525118
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Could return a `Collection` instead of `List`


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360612#comment-16360612
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167522151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
--- End diff --

This is a bit ugly how Netty did it here...


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360615#comment-16360615
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167523546
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   if (httpPostRequestDecoder.isMultipart()) {
+   currentHttpPostRequestDecoder = 
httpPostRequestDecoder;
+   currentHttpRequest = httpRequest;
+   } else {
+   ctx.fireChan

[jira] [Updated] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8411:

Issue Type: Improvement  (was: Bug)

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable> map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167528319
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
--- End diff --

We could initialize it with the correct capacity.


---


[jira] [Updated] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8411:

Priority: Blocker  (was: Critical)

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable> map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360627#comment-16360627
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167528319
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
--- End diff --

We could initialize it with the correct capacity.


> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarListHandler}} to new REST endpoin

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360630#comment-16360630
 ] 

Fabian Hueske commented on FLINK-8538:
--

Hi, I agree with [~twalthr]. This looks mostly good, but we shouldn't add a new 
module. Also, it would be good to implement the code in Java as the project 
tries to reduce its dependency on Scala.

How about the following?
* the Kafka descriptor -> flink-connector-kafka-base
* a format independent (but version specific) KafkaTableSource -> 
flink-connector-kafka*version*
* format and version specific TableSourceFactories -> 
flink-connector-kafka*version*
* service discovery files -> flink-connector-kafka*version*

The format and version specific TableSourceFactories will be replaced by format 
independent factories once the format factory infrastructure is in place.
We can also go for version specific descriptors, if that makes more sense.
Since most of the changes would be in version-specific modules, I'd start with 
one module first (Kafka 0.11) and port the change later to earlier versions 
once we agreed on the interfaces. Version independent code can also be 
extracted into abstract classes in flink-connector-kafka-base. 

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8042) Retry individual failover-strategy for some time first before reverting to full job restart

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8042:

Issue Type: Improvement  (was: Bug)

> Retry individual failover-strategy for some time first before reverting to 
> full job restart
> ---
>
> Key: FLINK-8042
> URL: https://issues.apache.org/jira/browse/FLINK-8042
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager, State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Steven Zhen Wu
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Let's we will a taskmanager node. When Flink tries to attempt fine grained 
> recovery and fails replacement taskmanager node didn't come back in time, it 
> reverts to full job restart. 
> Stephan and Till was suggesting that Flink can/should retry fine grained 
> recovery for some time before giving up and reverting full job restart



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8630) To support JSON schema to TypeInformation conversion

2018-02-12 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-8630:
---

Assignee: Timo Walther

> To support JSON schema to TypeInformation conversion 
> -
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Major
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7952) Add metrics for counting logging events

2018-02-12 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360646#comment-16360646
 ] 

Chesnay Schepler commented on FLINK-7952:
-

I question the value of having this counter as part of Flink.

As i understand it we wouldn't be able to plug these into the metric system 
(without reflection/static access), which severely limits the visibility.

Furthermore, why actually have this as part of the Flink project? This is 
perfect for a small self-contained project. There's nothing Flink-specific in 
there, in fact the only reason why you can't use the Hadoop EventCounter with 
Flink is that they have annotated it with their annotations.

> Add metrics for counting logging events 
> 
>
> Key: FLINK-7952
> URL: https://issues.apache.org/jira/browse/FLINK-7952
> Project: Flink
>  Issue Type: Wish
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Critical
> Fix For: 1.5.0
>
>
> It would be useful to track logging events .
> *impl:*
> adds event counting via a custom Log4J Appender, this tracks the number of 
> INFO, WARN, ERROR and FATAL logging events.
> *ref:*
> hadoop-common: [org.apache.hadoop.log.metrics. EventCounter 
> |https://github.com/apache/hadoop/blob/f67237cbe7bc48a1b9088e990800b37529f1db2a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/metrics/EventCounter.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8172) Remove unnecessary synchronisation in RecordSerializer

2018-02-12 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-8172.
-
Resolution: Fixed

The issue was superseded by https://issues.apache.org/jira/browse/FLINK-8178 

> Remove unnecessary synchronisation in RecordSerializer
> --
>
> Key: FLINK-8172
> URL: https://issues.apache.org/jira/browse/FLINK-8172
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.5.0
>
>
> While writing the records, RecordSerializer is the only owner of the `Buffer` 
> into which data are written. Yet we are synchronisation twice per record 
> while accessing MemorySegment. Removing this synchronisation speeds up the 
> Network throughput in point to point benchmark by a factor of two (from 
> ~12500records/ms up to 23000 records/ms).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8630) To support JSON schema to TypeInformation conversion

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8630:

Fix Version/s: 1.5.0

> To support JSON schema to TypeInformation conversion 
> -
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8630) To support JSON schema to TypeInformation conversion

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8630:

Priority: Blocker  (was: Major)

> To support JSON schema to TypeInformation conversion 
> -
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5459: [FLINK-8475][config][docs] Integrate FS options

2018-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5459

[FLINK-8475][config][docs] Integrate FS options

## What is the purpose of the change

This PR ports the fileystem ConfigConstants to `ConfigOptions` and 
integrates them into the configuration docs generator.

## Brief change log

* port filesystem config constants to config options
* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate filesystem configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_fs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5459


commit 6238b4184295ab31933b4cc62a1f30ac11c0f09f
Author: zentol 
Date:   2018-01-22T15:16:02Z

[FLINK-8475][config][docs] Integrate FS options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360663#comment-16360663
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5459

[FLINK-8475][config][docs] Integrate FS options

## What is the purpose of the change

This PR ports the fileystem ConfigConstants to `ConfigOptions` and 
integrates them into the configuration docs generator.

## Brief change log

* port filesystem config constants to config options
* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate filesystem configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_fs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5459


commit 6238b4184295ab31933b4cc62a1f30ac11c0f09f
Author: zentol 
Date:   2018-01-22T15:16:02Z

[FLINK-8475][config][docs] Integrate FS options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5460: [FLINK-8475][config][docs] Integrate Algorithm opt...

2018-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5460

[FLINK-8475][config][docs] Integrate Algorithm options

## What is the purpose of the change

This PR ports the runtime algorithm ConfigConstants to `ConfigOptions` and 
integrates them into the configuration docs generator.

## Brief change log

* runtime algorithm config constants to config options
* integrate runtime algorithm configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_algorithm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5460


commit 4d991f39823898b4385007a3ef445480f196efae
Author: zentol 
Date:   2018-01-30T13:06:30Z

[FLINK-8475][config][docs] Integrate Algorithm options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360666#comment-16360666
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5460

[FLINK-8475][config][docs] Integrate Algorithm options

## What is the purpose of the change

This PR ports the runtime algorithm ConfigConstants to `ConfigOptions` and 
integrates them into the configuration docs generator.

## Brief change log

* runtime algorithm config constants to config options
* integrate runtime algorithm configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_algorithm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5460


commit 4d991f39823898b4385007a3ef445480f196efae
Author: zentol 
Date:   2018-01-30T13:06:30Z

[FLINK-8475][config][docs] Integrate Algorithm options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167540798
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
+   File[] list = jarDir.listFiles(new 
FilenameFilter() {
+   @Override
+   public boolean accept(File dir, String 
name) {
+   return name.endsWith(".jar");
+   }
+   });
+   // last modified ascending order
+   Arrays.sort(list, (f1, f2) -> 
Long.compare(f2.lastModified(), f1.lastModified()));
+
+   for (File f : list) {
--- End diff --

this was fixed in the legacy version on master, please update the code here 
to not cause regressions.


---


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360675#comment-16360675
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167540798
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
+   File[] list = jarDir.listFiles(new 
FilenameFilter() {
+   @Override
+   public boolean accept(File dir, String 
name) {
+   return name.endsWith(".jar");
+   }
+   });
+   // last modified ascending order
+   Arrays.sort(list, (f1, f2) -> 
Long.compare(f2.lastModified(), f1.lastModified()));
+
+   for (File f : list) {

[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167541843
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a.


---


[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167542075
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
+   File[] list = jarDir.listFiles(new 
FilenameFilter() {
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a. I suppose you'll have to create new 
utility methods outside the  `WebRuntimeMonitor`.


---


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360686#comment-16360686
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167541843
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a.


> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
>   

[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360685#comment-16360685
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5455#discussion_r167542075
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListHandler.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Handle request for listing uploaded jars.
+ */
+public class JarListHandler extends AbstractRestHandler {
+
+   private final File jarDir;
+
+   private final Executor executor;
+
+   public JarListHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   MessageHeaders messageHeaders,
+   File jarDir,
+   Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+   final String localAddress;
+   checkState(localAddressFuture.isDone());
+
+   try {
+   localAddress = localAddressFuture.get();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+
+   return CompletableFuture.supplyAsync(() -> {
+   try {
+   List jarFileList = new 
ArrayList<>();
+   File[] list = jarDir.listFiles(new 
FilenameFilter() {
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a. I suppose you'll have to create new 
utility methods outside the  `WebRuntimeMonitor`.


> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>  

[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167542881
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a. I suppose you'll have to create new 
utility methods outside the  `WebRuntimeMonitor`.


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360691#comment-16360691
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167542881
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a. I suppose you'll have to create new 
utility methods outside the  `WebRuntimeMonitor`.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>   

[GitHub] flink pull request #5461: [FLINK-8475][config][docs] Integrate Mesos options

2018-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5461

[FLINK-8475][config][docs] Integrate Mesos options

## What is the purpose of the change

This PR integrates the Mesos `ConfigOptions` into the configuration docs 
generator.

## Brief change log

* extend generator configuration to pick up 
`MesosOptions`/`MesosTaskManagerParameters` classes in flink-mesos
* update generator file matching to accept `MesosTaskManagerParameters`
* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate mesos configuration tables into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_mesos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5461


commit fe3a15693e355e1fa1facb6adb21061767c680d3
Author: zentol 
Date:   2018-01-23T13:20:12Z

[FLINK-8475][config][docs] Integrate Mesos options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360701#comment-16360701
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5461

[FLINK-8475][config][docs] Integrate Mesos options

## What is the purpose of the change

This PR integrates the Mesos `ConfigOptions` into the configuration docs 
generator.

## Brief change log

* extend generator configuration to pick up 
`MesosOptions`/`MesosTaskManagerParameters` classes in flink-mesos
* update generator file matching to accept `MesosTaskManagerParameters`
* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate mesos configuration tables into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_mesos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5461


commit fe3a15693e355e1fa1facb6adb21061767c680d3
Author: zentol 
Date:   2018-01-23T13:20:12Z

[FLINK-8475][config][docs] Integrate Mesos options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8512) HAQueryableStateFsBackendITCase unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8512:

Fix Version/s: 1.4.2

> HAQueryableStateFsBackendITCase unstable on Travis
> --
>
> Key: FLINK-8512
> URL: https://issues.apache.org/jira/browse/FLINK-8512
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> {{HAQueryableStateFsBackendITCase}} is unstable on Travis.
> In the logs one can see that there is an {{AssertionError}} in the 
> {{globalEventExecutor-1-1}} {{Thread}}. This indicates that assertions are 
> not properly propagated and simply swallowed.
>  
> https://travis-ci.org/apache/flink/jobs/333250401



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8621:

Fix Version/s: 1.4.2

> PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
> Travis
> 
>
> Key: FLINK-8621
> URL: https://issues.apache.org/jira/browse/FLINK-8621
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails 
> on Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8520) CassandraConnectorITCase.testCassandraTableSink unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8520:

Fix Version/s: 1.4.2

> CassandraConnectorITCase.testCassandraTableSink unstable on Travis
> --
>
> Key: FLINK-8520
> URL: https://issues.apache.org/jira/browse/FLINK-8520
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Table API & SQL, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{CassandraConnectorITCase.testCassandraTableSink}} fails on Travis with 
> a timeout.
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/333711342



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8521) FlinkKafkaProducer011ITCase.testRunOutOfProducersInThePool timed out on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8521:

Fix Version/s: 1.4.2

> FlinkKafkaProducer011ITCase.testRunOutOfProducersInThePool timed out on Travis
> --
>
> Key: FLINK-8521
> URL: https://issues.apache.org/jira/browse/FLINK-8521
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{FlinkKafkaProducer011ITCase.testRunOutOfProducersInThePool}} timed out 
> on Travis with producing no output for longer than 300s.
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/334642014



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8623) ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8623:

Fix Version/s: 1.4.2

> ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on 
> Travis
> 
>
> Key: FLINK-8623
> URL: https://issues.apache.org/jira/browse/FLINK-8623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> {{ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics}} fails on 
> Travis: https://travis-ci.org/apache/flink/jobs/33932



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8402) HadoopS3FileSystemITCase.testDirectoryListing fails on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8402:

Fix Version/s: 1.4.2

> HadoopS3FileSystemITCase.testDirectoryListing fails on Travis
> -
>
> Key: FLINK-8402
> URL: https://issues.apache.org/jira/browse/FLINK-8402
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The test {{HadoopS3FileSystemITCase.testDirectoryListing}} fails on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327021175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8337) GatherSumApplyITCase.testConnectedComponentsWithObjectReuseDisabled instable

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8337:

Fix Version/s: 1.4.2

> GatherSumApplyITCase.testConnectedComponentsWithObjectReuseDisabled instable
> 
>
> Key: FLINK-8337
> URL: https://issues.apache.org/jira/browse/FLINK-8337
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{GatherSumApplyITCase.testConnectedComponentsWithObjectReuseDisabled}} 
> fails on Travis. It looks as if a sub partition has not been registered at 
> the task event dispatcher.
> https://travis-ci.org/apache/flink/jobs/323930301



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8336) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8336:

Fix Version/s: 1.4.2

> YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability
> ---
>
> Key: FLINK-8336
> URL: https://issues.apache.org/jira/browse/FLINK-8336
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3}} fails on 
> Travis. I suspect that this has something to do with the consistency 
> guarantees S3 gives us.
> https://travis-ci.org/tillrohrmann/flink/jobs/323930297



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8408) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8408:

Fix Version/s: 1.4.2

> YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis
> --
>
> Key: FLINK-8408
> URL: https://issues.apache.org/jira/browse/FLINK-8408
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n}} is unstable 
> on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327216460



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis

2018-02-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8517:

Fix Version/s: 1.4.2

> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---
>
> Key: FLINK-8517
> URL: https://issues.apache.org/jira/browse/FLINK-8517
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>   at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> https://api.travis-ci.org/v3/job/60156/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360725#comment-16360725
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for the comments, [~twalthr] and [~fhueske]. Yes, the {{flink-kafka}} is 
just a temporary module and I'll remove it once I find proper locations for the 
codes. It's surely better if we can unify all the version/format specific Kafka 
connectors. However, that may not be a short-term work and thus I'll adapt to 
the current implementations. I think my next task would be FLINK-8537. So feel 
free to assign FLINK-8630.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-12 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360758#comment-16360758
 ] 

vinoyang commented on FLINK-8459:
-

Hi [~till.rohrmann] , my idea is like yours in a way, refer to the JobManager 
handle CancelJobWithSavepoint message in old version (such as v1.3.2). The 
request contains trigger savepoint and cancel job two actions and the eventual 
cancel should happen after the savepoint invoke successfully.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167537474
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
--- End diff --

Java doc in this interface shouldn't mention implementation specific 
details. On the other hand, this java doc doesn't explain what `newBuffer` is 
doing and for this information one must check the `BufferSpiller`'s java doc 
itself.

Can you add appropriate java doc here, or better add java doc to proposed 
in the comment below two methods:  `rollOverWithoutReusingResources()` and 
`rollOverReusingResources()`. Comment in 
`CachedBufferBlocker.java#rollOverReusingResources` should state that it is 
never reusing resources and is defaulting to 
`CachedBufferBlocker.java#rollOverWithoutReusingResources`


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167535311
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
--- End diff --

nit: Missing period in java doc (build failure).


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167535452
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BufferBlocker}.
+ */
+public abstract class BufferBlockerTestBase {
+
+   protected static final int PAGE_SIZE = 4096;
+
+   abstract BufferBlocker createBufferBlocker();
+
+   @Test
+   public void testRollOverEmptySequences() throws IOException {
+   BufferBlocker bufferBlocker = createBufferBlocker();
+   assertNull(bufferBlocker.rollOver(false));
+   assertNull(bufferBlocker.rollOver(false));
+   assertNull(bufferBlocker.rollOver(false));
+   }
+
+   @Test
+   public void testSpillAndRollOverSimple() throws IOException {
+   final Random rnd = new Random();
+   final Random bufferRnd = new Random();
+
+   final int maxNumEventsAndBuffers = 3000;
+   final int maxNumChannels = 1656;
+
+   BufferBlocker bufferBlocker = createBufferBlocker();
+
+   // do multiple spilling / rolling over rounds
+   for (int round = 0; round < 5; round++) {
+
+   final long bufferSeed = rnd.nextLong();
+   bufferRnd.setSeed(bufferSeed);
+
+   final int numEventsAndBuffers = 
rnd.nextInt(maxNumEventsAndBuffers) + 1;
+   final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+
+   final ArrayList events = new 
ArrayList(128);
+
+   // generate sequence
+   for (int i = 0; i < numEventsAndBuffers; i++) {
+   boolean isEvent = rnd.nextDouble() < 0.05d;
+   BufferOrEvent evt;
+   if (isEvent) {
+   evt = generateRandomEvent(rnd, 
numChannels);
+   events.add(evt);
+   } else {
+   evt = 
generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, 
bufferRnd.nextInt(numChannels));
+   }
+   bufferBlocker.add(evt);
+   }
+
+   // reset and create reader
+   bufferRnd.setSeed(bufferSeed);
+
+   BufferOrEventSequence seq = 
bufferBlocker.rollOver(false);
+   seq.open();
+
+   // read and validate the sequence
+
+   int numEvent = 0;
+   for (int i = 0; i < numEventsAndBuffers; i++) {
+   BufferOrEvent next = seq.getNext();
+   assertNotNull(next);
+   if (next.isEvent()) {
+   BufferOrEvent expected = 
events.get(numEvent++);
+   asser

[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167537059
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
+* @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+*/
+   BufferOrEventSequence rollOver(boolean newBuffer) throws IOException;
--- End diff --

Could we stick with two methods in the interface? I think more descriptive 
names will be better compared to parameter here: 
`rollOverWithoutReusingResources()` and `rollOverReusingResources()`, where: 
`rollOverWithoutReusingResources` == `rollOver(true)`.

Especially if one implementation doesn't support one of those calls.


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167557594
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
  */
-public class BarrierBufferTest {
-
-   private static final Random RND = new Random();
-
-   private static final int PAGE_SIZE = 512;
-
-   private static int sizeCounter = 0;
+public class BarrierBufferTest extends BarrierBufferTestBase {
--- End diff --

Rename the test class name to `SpillingBarrierBufferTest`?


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360764#comment-16360764
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167535452
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BufferBlocker}.
+ */
+public abstract class BufferBlockerTestBase {
+
+   protected static final int PAGE_SIZE = 4096;
+
+   abstract BufferBlocker createBufferBlocker();
+
+   @Test
+   public void testRollOverEmptySequences() throws IOException {
+   BufferBlocker bufferBlocker = createBufferBlocker();
+   assertNull(bufferBlocker.rollOver(false));
+   assertNull(bufferBlocker.rollOver(false));
+   assertNull(bufferBlocker.rollOver(false));
+   }
+
+   @Test
+   public void testSpillAndRollOverSimple() throws IOException {
+   final Random rnd = new Random();
+   final Random bufferRnd = new Random();
+
+   final int maxNumEventsAndBuffers = 3000;
+   final int maxNumChannels = 1656;
+
+   BufferBlocker bufferBlocker = createBufferBlocker();
+
+   // do multiple spilling / rolling over rounds
+   for (int round = 0; round < 5; round++) {
+
+   final long bufferSeed = rnd.nextLong();
+   bufferRnd.setSeed(bufferSeed);
+
+   final int numEventsAndBuffers = 
rnd.nextInt(maxNumEventsAndBuffers) + 1;
+   final int numChannels = rnd.nextInt(maxNumChannels) + 1;
+
+   final ArrayList events = new 
ArrayList(128);
+
+   // generate sequence
+   for (int i = 0; i < numEventsAndBuffers; i++) {
+   boolean isEvent = rnd.nextDouble() < 0.05d;
+   BufferOrEvent evt;
+   if (isEvent) {
+   evt = generateRandomEvent(rnd, 
numChannels);
+   events.add(evt);
+   } else {
+   evt = 
generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, 
bufferRnd.nextInt(numChannels));
+   }
+   bufferBlocker.add(evt);
+   }
+
+   // reset and create reader
+   bufferRnd.setSeed(bufferSeed);
+
+   BufferOrEventSequence seq = 
bufferBlocker.rollOver(false);
+   seq.open();
+
+   // read and validate the sequence
+
+   int numEvent = 0;
+   for (int i = 0; i < numEventsAndBuffers; i++) {
+   BufferOrEvent next = s

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360761#comment-16360761
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167537059
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
+* @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+*/
+   BufferOrEventSequence rollOver(boolean newBuffer) throws IOException;
--- End diff --

Could we stick with two methods in the interface? I think more descriptive 
names will be better compared to parameter here: 
`rollOverWithoutReusingResources()` and `rollOverReusingResources()`, where: 
`rollOverWithoutReusingResources` == `rollOver(true)`.

Especially if one implementation doesn't support one of those calls.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360762#comment-16360762
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167557594
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
  */
-public class BarrierBufferTest {
-
-   private static final Random RND = new Random();
-
-   private static final int PAGE_SIZE = 512;
-
-   private static int sizeCounter = 0;
+public class BarrierBufferTest extends BarrierBufferTestBase {
--- End diff --

Rename the test class name to `SpillingBarrierBufferTest`?


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360763#comment-16360763
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167537474
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and 
adds them in a sequence.
+ * After a number of elements have been added, the blocker can "roll 
over": It presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+   /**
+* Adds a buffer or event to the blocker.
+*
+* @param boe The buffer or event to be added into the blocker.
+*/
+   void add(BufferOrEvent boe) throws IOException;
+
+   /**
+* Starts a new sequence of buffers and event and returns the current 
sequence of buffers for reading.
+* This method returns {@code null}, if nothing was added since the 
creation, or the last call to this method.
+*
+* @param newBuffer only works for {@link BufferSpiller} implements 
currently.
--- End diff --

Java doc in this interface shouldn't mention implementation specific 
details. On the other hand, this java doc doesn't explain what `newBuffer` is 
doing and for this information one must check the `BufferSpiller`'s java doc 
itself.

Can you add appropriate java doc here, or better add java doc to proposed 
in the comment below two methods:  `rollOverWithoutReusingResources()` and 
`rollOverReusingResources()`. Comment in 
`CachedBufferBlocker.java#rollOverReusingResources` should state that it is 
never reusing resources and is defaulting to 
`CachedBufferBlocker.java#rollOverWithoutReusingResources`


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360765#comment-16360765
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167535311
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -18,1426 +18,40 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer}.
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}
--- End diff --

nit: Missing period in java doc (build failure).


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5462: [FLINK-8475][config][docs] Integrate HA-ZK options

2018-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5462

[FLINK-8475][config][docs] Integrate HA-ZK options

## What is the purpose of the change

This PR integrates the Zookeeper HA `ConfigOptions` into the configuration 
docs generator.

## Brief change log

* Add `ConfigGroup` for zookeeper HA config options
* integrate zookeeper HA configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_ha_zk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5462


commit 14e168c1c9c526a5feb0cac5738ea9a3663b0466
Author: zentol 
Date:   2018-01-23T12:50:32Z

[FLINK-8475][config][docs] Integrate HA-ZK options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360799#comment-16360799
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5462

[FLINK-8475][config][docs] Integrate HA-ZK options

## What is the purpose of the change

This PR integrates the Zookeeper HA `ConfigOptions` into the configuration 
docs generator.

## Brief change log

* Add `ConfigGroup` for zookeeper HA config options
* integrate zookeeper HA configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_ha_zk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5462


commit 14e168c1c9c526a5feb0cac5738ea9a3663b0466
Author: zentol 
Date:   2018-01-23T12:50:32Z

[FLINK-8475][config][docs] Integrate HA-ZK options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5463: [FLINK-8475][config][docs] Integrate YARN options

2018-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5463

[FLINK-8475][config][docs] Integrate YARN options

## What is the purpose of the change

This PR integrates the YARN `ConfigOptions` into the configuration docs 
generator.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate YARN  configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5463


commit 7199209e6f6ed68589ab841dbbc781802e608e55
Author: zentol 
Date:   2018-01-23T13:04:36Z

[FLINK-8475][config][docs] Integrate YARN options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360803#comment-16360803
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5463

[FLINK-8475][config][docs] Integrate YARN options

## What is the purpose of the change

This PR integrates the YARN `ConfigOptions` into the configuration docs 
generator.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate YARN  configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5463


commit 7199209e6f6ed68589ab841dbbc781802e608e55
Author: zentol 
Date:   2018-01-23T13:04:36Z

[FLINK-8475][config][docs] Integrate YARN options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5464: [FLINK-8475][config][docs] Integrate Checkpointing...

2018-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5464

[FLINK-8475][config][docs] Integrate Checkpointing options

## What is the purpose of the change

This PR integrates the Checkpointing  `ConfigOptions` into the 
configuration docs generator.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate Checkpointing configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_cp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5464


commit ea8f1ae8b36d793fec7cfced887bae38650c0ba6
Author: zentol 
Date:   2018-01-23T13:57:20Z

[FLINK-8475][config][docs] Integrate Checkpointing options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360810#comment-16360810
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5464

[FLINK-8475][config][docs] Integrate Checkpointing options

## What is the purpose of the change

This PR integrates the Checkpointing  `ConfigOptions` into the 
configuration docs generator.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate Checkpointing configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_cp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5464


commit ea8f1ae8b36d793fec7cfced887bae38650c0ba6
Author: zentol 
Date:   2018-01-23T13:57:20Z

[FLINK-8475][config][docs] Integrate Checkpointing options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8639) Fix always need to seek multiple times when iterator RocksDBMapState

2018-02-12 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8639:
-

 Summary: Fix always need to seek multiple times when iterator 
RocksDBMapState
 Key: FLINK-8639
 URL: https://issues.apache.org/jira/browse/FLINK-8639
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


Currently, almost every time we want to iterator a RocksDBMapState we need to 
do seek at least 2 times (Seek is a poor performance action for rocksdb cause 
it can't use the bloomfilter). This is because `RocksDBMapIterator` use a 
`cacheEntries` to cache the seek values every time and the `cacheEntries`'s 
init size is 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8521) FlinkKafkaProducer011ITCase.testRunOutOfProducersInThePool timed out on Travis

2018-02-12 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360820#comment-16360820
 ] 

Till Rohrmann commented on FLINK-8521:
--

Another instance: https://api.travis-ci.org/v3/job/340439626/log.txt

> FlinkKafkaProducer011ITCase.testRunOutOfProducersInThePool timed out on Travis
> --
>
> Key: FLINK-8521
> URL: https://issues.apache.org/jira/browse/FLINK-8521
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{FlinkKafkaProducer011ITCase.testRunOutOfProducersInThePool}} timed out 
> on Travis with producing no output for longer than 300s.
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/334642014



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5465: [FLINK-8639][State Backends]Fix always need to see...

2018-02-12 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5465

[FLINK-8639][State Backends]Fix always need to seek multiple times when 
iterator RocksDBMapState

## What is the purpose of the change

This PR addressed issue 
[FLINK-8639](https://issues.apache.org/jira/browse/FLINK-8639). Currently, 
almost every time we want to iterator a RocksDBMapState we need to do seek at 
least 2 times (Seek is a poor performance action for rocksdb cause it can't use 
the bloomfilter). This is because `RocksDBMapIterator` use a `cacheEntries` to 
cache the seek values every time and the `cacheEntries`'s init size is 1.

## Brief change log


- Change CACHE_SIZE_BASE from 1 to 32.
- Compare in reverse order in function underSameKey().

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink improve_rocksdb_base_mapstate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5465.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5465


commit 753855ec2957492ba6c2eb272fdc5ffe7c769608
Author: summerleafs 
Date:   2018-02-12T14:28:04Z

1, Removed CACHE_SIZE_BASE, CACHE_SIZE_LIMIT.
2, Compare in reverse order in function underSameKey().

commit e33b807ba151265e0fd86d52701965dc5ffcb7be
Author: summerleafs 
Date:   2018-02-12T14:50:28Z

change CACHE_SIZE_BASE from 1 to 32.




---


  1   2   3   >