[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...

2017-12-18 Thread djh4230
Github user djh4230 commented on the issue:

https://github.com/apache/flink/pull/4926
  
Does that mean it's not a issue? But the issue still happened in 
flink-1.4.How can i fix the issue?


---


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296279#comment-16296279
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

Github user djh4230 commented on the issue:

https://github.com/apache/flink/pull/4926
  
Does that mean it's not a issue? But the issue still happened in 
flink-1.4.How can i fix the issue?


> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8260) Document API of Kafka 0.11 Producer

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296200#comment-16296200
 ] 

ASF GitHub Bot commented on FLINK-8260:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5179

[FLINK-8260/8287] [kafka] Bunch of improvements to Kafka producer Javadocs 
/ document

## What is the purpose of the change

This PR collects several improvements to the `FlinkKafkaProducer` Javadocs 
and user document, with a focus on:

- Educate proper producer construction to write timestamps in Flink to 
Kafka, and not demonstrate deprecated usage.
- Clarify partitioning behaviours for the producer, for all variations of 
constructors that could be used.

It also has some miscellaneous trivial fixes that were found meanwhile.

## Brief change log

The commit history should serve as a clear list of what has been changed.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8260/8287

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5179


commit 47bb55a80c308506e7388715eb3b54a0c7733bcf
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-18T23:21:31Z

[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in 
FlinkKafkaProducer docs

commit 16fd116598b05f0b4ed4e4535fe9419767d69915
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:21:20Z

[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify 
partitioning behaviour

commit 51a64315d59c9dcab39f5ec52b7fa3d6599ca3fd
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:29:38Z

[FLINK-8260] [kafka] Reorder deprecated / regular constructors of 
FlinkKafkaProducer010

This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.

commit 7219f159a7db502edf2746d1eafa835c85680931
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:34:17Z

[hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration

FlinkKafkaProducer010Configuration is the return type of the deprecated
writeToKafkaWithTimestamp factory methods. Therefore, the class should
also be deprecated as well.

commit c84033e763c2e59ea4a21d186c0c5f29c4f2a02d
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:36:31Z

[hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09

The previous link was referencing a non-existent constructor signature.

commit e11f21a888a407f23d18c4c4450584443a7da4c8
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:38:32Z

[hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010




> Document API of Kafka 0.11 Producer
> ---
>
> Key: FLINK-8260
> URL: https://issues.apache.org/jira/browse/FLINK-8260
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The API of the Flink Kafka Producer changed for Kafka 0.11, for example there 
> is no {{writeToKafkaWithTimestamps}} method anymore.
> This needs to be added to the [Kafka connector 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer],
>  i.e., a new tab with a code snippet needs to be added for Kafka 0.11.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5178: [hotfix] Fix typo in TestableKinesisDataFetcher

2017-12-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5178
  
+1, thanks for the fix.
Merging this ...


---


[GitHub] flink pull request #5179: [FLINK-8260/8287] [kafka] Bunch of improvements to...

2017-12-18 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5179

[FLINK-8260/8287] [kafka] Bunch of improvements to Kafka producer Javadocs 
/ document

## What is the purpose of the change

This PR collects several improvements to the `FlinkKafkaProducer` Javadocs 
and user document, with a focus on:

- Educate proper producer construction to write timestamps in Flink to 
Kafka, and not demonstrate deprecated usage.
- Clarify partitioning behaviours for the producer, for all variations of 
constructors that could be used.

It also has some miscellaneous trivial fixes that were found meanwhile.

## Brief change log

The commit history should serve as a clear list of what has been changed.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8260/8287

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5179


commit 47bb55a80c308506e7388715eb3b54a0c7733bcf
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-18T23:21:31Z

[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in 
FlinkKafkaProducer docs

commit 16fd116598b05f0b4ed4e4535fe9419767d69915
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:21:20Z

[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify 
partitioning behaviour

commit 51a64315d59c9dcab39f5ec52b7fa3d6599ca3fd
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:29:38Z

[FLINK-8260] [kafka] Reorder deprecated / regular constructors of 
FlinkKafkaProducer010

This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.

commit 7219f159a7db502edf2746d1eafa835c85680931
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:34:17Z

[hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration

FlinkKafkaProducer010Configuration is the return type of the deprecated
writeToKafkaWithTimestamp factory methods. Therefore, the class should
also be deprecated as well.

commit c84033e763c2e59ea4a21d186c0c5f29c4f2a02d
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:36:31Z

[hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09

The previous link was referencing a non-existent constructor signature.

commit e11f21a888a407f23d18c4c4450584443a7da4c8
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T04:38:32Z

[hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010




---


[jira] [Comment Edited] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4

2017-12-18 Thread jia liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296123#comment-16296123
 ] 

jia liu edited comment on FLINK-8248 at 12/19/17 3:29 AM:
--

It happened again, I've read the source code. Found that may be a bug at 
*org.apache.flink.cep.nfa.SharedBufferSerializer.serialize()*.

It did not put the *SharedBufferEntry* related *edges* into *entryIDs* in the 
first loop, But query from *entryIDs* in the second loop at line 942.


{code:java}
@Override
public void serialize(SharedBuffer record, DataOutputView 
target) throws IOException {
Map> pages = record.pages;
Map, Integer> entryIDs = new 
HashMap<>();

int totalEdges = 0;
int entryCounter = 0;

// number of pages
target.writeInt(pages.size());

for (Map.Entry> pageEntry: 
pages.entrySet()) {
SharedBufferPage page = 
pageEntry.getValue();

// key for the current page
keySerializer.serialize(page.getKey(), target);

// number of page entries
target.writeInt(page.entries.size());

for (Map.Entry> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry sharedBuffer = 
sharedBufferEntry.getValue();

// assign id to the sharedBufferEntry 
for the future
// serialization of the previous 
relation
entryIDs.put(sharedBuffer, 
entryCounter++);

ValueTimeWrapper valueTimeWrapper = 
sharedBuffer.getValueTime();


valueSerializer.serialize(valueTimeWrapper.getValue(), target);

target.writeLong(valueTimeWrapper.getTimestamp());

target.writeInt(valueTimeWrapper.getCounter());

int edges = sharedBuffer.edges.size();
totalEdges += edges;


target.writeInt(sharedBuffer.referenceCounter);
}
}

// write the edges between the shared buffer entries
target.writeInt(totalEdges);

for (Map.Entry> pageEntry: 
pages.entrySet()) {
SharedBufferPage page = 
pageEntry.getValue();

for (Map.Entry> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry sharedBuffer = 
sharedBufferEntry.getValue();

Integer id = entryIDs.get(sharedBuffer);
Preconditions.checkState(id != null, 
"Could not find id for entry: " + sharedBuffer + "pages: " + pages + "entryIDs: 
" + entryIDs);

for (SharedBufferEdge edge: 
sharedBuffer.edges) {
// in order to serialize the 
previous relation we simply serialize the ids
// of the source and target 
SharedBufferEntry
if (edge.target != null) {
Integer targetId = 
entryIDs.get(edge.getTarget());

Preconditions.checkState(targetId != null,
"Could 
not find id for entry: " + edge.getTarget() + "pages: " + pages + "entryIDs: " 
+ entryIDs);

target.writeInt(id);

target.writeInt(targetId);

versionSerializer.serialize(edge.version, target);
} else {
target.writeInt(id);
target.writeInt(-1);

versionSerializer.serialize(edge.version, target);
}
}
 

[jira] [Reopened] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4

2017-12-18 Thread jia liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jia liu reopened FLINK-8248:


It happened again, I've read the source code. Found that may be a bug at 
*org.apache.flink.cep.nfa.SharedBufferSerializer.serialize()*.

It did not put the *SharedBufferEntry* related *edges* into *entryIDs* in the 
first loop, But query from *entryIDs*  at line 942.


{code:java}
@Override
public void serialize(SharedBuffer record, DataOutputView 
target) throws IOException {
Map> pages = record.pages;
Map, Integer> entryIDs = new 
HashMap<>();

int totalEdges = 0;
int entryCounter = 0;

// number of pages
target.writeInt(pages.size());

for (Map.Entry> pageEntry: 
pages.entrySet()) {
SharedBufferPage page = 
pageEntry.getValue();

// key for the current page
keySerializer.serialize(page.getKey(), target);

// number of page entries
target.writeInt(page.entries.size());

for (Map.Entry> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry sharedBuffer = 
sharedBufferEntry.getValue();

// assign id to the sharedBufferEntry 
for the future
// serialization of the previous 
relation
entryIDs.put(sharedBuffer, 
entryCounter++);

ValueTimeWrapper valueTimeWrapper = 
sharedBuffer.getValueTime();


valueSerializer.serialize(valueTimeWrapper.getValue(), target);

target.writeLong(valueTimeWrapper.getTimestamp());

target.writeInt(valueTimeWrapper.getCounter());

int edges = sharedBuffer.edges.size();
totalEdges += edges;


target.writeInt(sharedBuffer.referenceCounter);
}
}

// write the edges between the shared buffer entries
target.writeInt(totalEdges);

for (Map.Entry> pageEntry: 
pages.entrySet()) {
SharedBufferPage page = 
pageEntry.getValue();

for (Map.Entry> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry sharedBuffer = 
sharedBufferEntry.getValue();

Integer id = entryIDs.get(sharedBuffer);
Preconditions.checkState(id != null, 
"Could not find id for entry: " + sharedBuffer + "pages: " + pages + "entryIDs: 
" + entryIDs);

for (SharedBufferEdge edge: 
sharedBuffer.edges) {
// in order to serialize the 
previous relation we simply serialize the ids
// of the source and target 
SharedBufferEntry
if (edge.target != null) {
Integer targetId = 
entryIDs.get(edge.getTarget());

Preconditions.checkState(targetId != null,
"Could 
not find id for entry: " + edge.getTarget() + "pages: " + pages + "entryIDs: " 
+ entryIDs);

target.writeInt(id);

target.writeInt(targetId);

versionSerializer.serialize(edge.version, target);
} else {
target.writeInt(id);
target.writeInt(-1);

versionSerializer.serialize(edge.version, target);
}
}
}
}
}
{code}


 data example :

{noformat}

[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade from deprecated c...

2017-12-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5171
  
No, those classes have been deprecated in AWS SDK 1.11.171.


---


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296107#comment-16296107
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5171
  
No, those classes have been deprecated in AWS SDK 1.11.171.


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes

2017-12-18 Thread dongxiao.yang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296073#comment-16296073
 ] 

dongxiao.yang edited comment on FLINK-8270 at 12/19/17 2:56 AM:


Yes , the PR should fixes the problem correctly .So can i expect  this bug will 
be fixed in the next release version 1.4.1?  


was (Author: dongxiao.yang):
Yes , the PR should fixes the problem correctly .So can i expect  this bug will 
be fixed in the next release vesion 1.4.1?  

> TaskManagers do not use correct local path for shipped Keytab files in Yarn 
> deployment modes
> 
>
> Key: FLINK-8270
> URL: https://issues.apache.org/jira/browse/FLINK-8270
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Reported on ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html
> This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are 
> again not using the correct local paths for shipped Keytab files.
> The cause was accidental due to this change: 
> https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e.
> Things to consider:
> 1) The above accidental breaking change was actually targeting a minor 
> refactor on the "integration test scenario" code block in 
> {{YarnTaskManagerRunner}}. It would be best if we can remove that test case 
> code block from the main code.
> 2) Unit test coverage is apparently not enough. As this incidence shows, any 
> slight changes can cause this issue to easily resurface again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes

2017-12-18 Thread dongxiao.yang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296073#comment-16296073
 ] 

dongxiao.yang commented on FLINK-8270:
--

Yes , the PR should fixes the problem correctly .So can i expect  this bug will 
be fixed in the next release vesion 1.4.1?  

> TaskManagers do not use correct local path for shipped Keytab files in Yarn 
> deployment modes
> 
>
> Key: FLINK-8270
> URL: https://issues.apache.org/jira/browse/FLINK-8270
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> Reported on ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html
> This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are 
> again not using the correct local paths for shipped Keytab files.
> The cause was accidental due to this change: 
> https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e.
> Things to consider:
> 1) The above accidental breaking change was actually targeting a minor 
> refactor on the "integration test scenario" code block in 
> {{YarnTaskManagerRunner}}. It would be best if we can remove that test case 
> code block from the main code.
> 2) Unit test coverage is apparently not enough. As this incidence shows, any 
> slight changes can cause this issue to easily resurface again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8

2017-12-18 Thread xymaqingxiang (JIRA)
xymaqingxiang created FLINK-8290:


 Summary: Modify clientId to groupId in flink-connector-kafka-0.8
 Key: FLINK-8290
 URL: https://issues.apache.org/jira/browse/FLINK-8290
 Project: Flink
  Issue Type: Improvement
Reporter: xymaqingxiang


Now the Clientid that consumes the all topics are 
constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for 
us to look at kafka's log, so I recommend that it be modified to groupid.

We can modify the SimpleConsumerThread.java file, as shown below:

{code:java}
private final String clientIdFormGroup;
...
this.clientIdFormGroup = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
...
final String clientId = clientIdFormGroup;
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8288) Register the web interface url to yarn for yarn job mode

2017-12-18 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8288:

Affects Version/s: 1.5.0

> Register the web interface url to yarn for yarn job mode
> 
>
> Key: FLINK-8288
> URL: https://issues.apache.org/jira/browse/FLINK-8288
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> For flip-6 job mode, the resource manager is created before the web monitor, 
> so the web interface url is not set to resource manager, and the resource 
> manager can not register the url to yarn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-18 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-8289:

Affects Version/s: 1.5.0

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8289:
---

 Summary: The RestServerEndpoint should return the address with 
real ip when getRestAdddress
 Key: FLINK-8289
 URL: https://issues.apache.org/jira/browse/FLINK-8289
 Project: Flink
  Issue Type: Bug
Reporter: shuai.xu


Now when RestServerEndpoint.getRestAddress, it will return an address same with 
the value of config rest.address, the default it 127.0.0.1:9067, but this 
address can not be accessed from another machine. And the ip for Dispatcher and 
JobMaster are usually dynamically, so user will configure it to 0.0.0.0, and 
the getRestAddress will return 0.0.0.0:9067, this address will be registered to 
YARN or Mesos, but this address can not be accessed from another machine also. 
So it need to return the real ip:port for user to access the web monitor 
anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-18 Thread brucewoo (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

brucewoo closed FLINK-8281.
---
   Resolution: Not A Problem
Fix Version/s: 1.4.0

> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to 
> org.apache.flink.core.fs.WrappingProxyCloseable
> -
>
> Key: FLINK-8281
> URL: https://issues.apache.org/jira/browse/FLINK-8281
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 
> 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: brucewoo
>Priority: Critical
> Fix For: 1.4.0
>
>
> {noformat}
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
> (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
> api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
> proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
> api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_151]
>   at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: 
> (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS 
> total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS 
> proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
> ~[na:1.8.0_151]
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 common frames omitted
>   Caused by: java.io.IOException: Could not open output stream for state 
> backend
>   at 
> 

[jira] [Created] (FLINK-8288) Register the web interface url to yarn for yarn job mode

2017-12-18 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8288:
---

 Summary: Register the web interface url to yarn for yarn job mode
 Key: FLINK-8288
 URL: https://issues.apache.org/jira/browse/FLINK-8288
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


For flip-6 job mode, the resource manager is created before the web monitor, so 
the web interface url is not set to resource manager, and the resource manager 
can not register the url to yarn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-18 Thread brucewoo (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

brucewoo updated FLINK-8281:


http://www.apache.org/dyn/closer.lua/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz

> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to 
> org.apache.flink.core.fs.WrappingProxyCloseable
> -
>
> Key: FLINK-8281
> URL: https://issues.apache.org/jira/browse/FLINK-8281
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 
> 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: brucewoo
>Priority: Critical
>
> {noformat}
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
> (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
> api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
> proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
> api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_151]
>   at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: 
> (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS 
> total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS 
> proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
> ~[na:1.8.0_151]
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 common frames omitted
>   Caused by: java.io.IOException: Could not open output stream for state 
> backend
>   at 
> 

[GitHub] flink pull request #5178: [hotfix] Fix typo in TestableKinesisDataFetcher

2017-12-18 Thread casidiablo
GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5178

[hotfix] Fix typo in TestableKinesisDataFetcher



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink hotfix/typo-gst

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5178


commit 0fa920614acae70efcebf3816d988cd02affcade
Author: Cristian 
Date:   2017-12-18T23:56:03Z

[hotfix] Fix typo in TestableKinesisDataFetcher




---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-18 Thread Cristian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295892#comment-16295892
 ] 

Cristian commented on FLINK-8162:
-

I will try to test and push a PR soon.

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8287) Flink Kafka Producer docs should clearly state what partitioner is used by default

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8287:
--

 Summary: Flink Kafka Producer docs should clearly state what 
partitioner is used by default
 Key: FLINK-8287
 URL: https://issues.apache.org/jira/browse/FLINK-8287
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Kafka Connector
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.5.0, 1.4.1


See original discussion in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html

It is worth mentioning what partitioning scheme is used by the 
{{FlinkKafkaProducer}} by default when writing to Kafka, as it seems user are 
often surprised by the default {{FlinkFixedPartitioner}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-18 Thread Furruska (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295843#comment-16295843
 ] 

Furruska commented on FLINK-8162:
-

Can you share your fork with me? I'm interested in this but haven't been able 
to make it work.

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295816#comment-16295816
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5171
  
Without the SDK upgrade, the API actually isn't yet deprecated yet, right?
It is only deprecated in a newer SDK version.

If so, I would prefer to not merge this until we actually try to upgrade 
the AWS Java SDK version. Then, this change would be more context relevant.


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade from deprecated c...

2017-12-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5171
  
Without the SDK upgrade, the API actually isn't yet deprecated yet, right?
It is only deprecated in a newer SDK version.

If so, I would prefer to not merge this until we actually try to upgrade 
the AWS Java SDK version. Then, this change would be more context relevant.


---


[jira] [Comment Edited] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-18 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295806#comment-16295806
 ] 

Stefan Richter edited comment on FLINK-8281 at 12/18/17 10:58 PM:
--

Hi,

I had a look at your stacktraces and something seems very strange about the 
stack trace but also the problem itself. In particular, this line doesn't make 
any sense to me if you check the line reported number:

{code}
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:1)
{code}

If I follow the the actual codepath, I also cannot see how this cast can ever 
fail. Are you sure your jar contains the proper dependencies for your Flink 
version? 


was (Author: srichter):
Hi,

I had a look at your stacktraces and something seems very strange the stack 
trace but also the problem itself. In particular, this line doesn't make any 
sense to me if you check the line reported number:

{code}
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:1)
{code}

If I follow the the actual codepath, I also cannot see how this cast can ever 
fail. Are you sure your jar contains the proper dependencies for your Flink 
version? 

> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to 
> org.apache.flink.core.fs.WrappingProxyCloseable
> -
>
> Key: FLINK-8281
> URL: https://issues.apache.org/jira/browse/FLINK-8281
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 
> 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: brucewoo
>Priority: Critical
>
> {noformat}
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
> (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
> api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
> proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
> api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_151]
>   at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: 
> (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS 
> total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS 
> proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
> ~[na:1.8.0_151]
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>

[jira] [Commented] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-18 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295806#comment-16295806
 ] 

Stefan Richter commented on FLINK-8281:
---

Hi,

I had a look at your stacktraces and something seems very strange the stack 
trace but also the problem itself. In particular, this line doesn't make any 
sense to me if you check the line reported number:

{code}
at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:1)
{code}

If I follow the the actual codepath, I also cannot see how this cast can ever 
fail. Are you sure your jar contains the proper dependencies for your Flink 
version? 

> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to 
> org.apache.flink.core.fs.WrappingProxyCloseable
> -
>
> Key: FLINK-8281
> URL: https://issues.apache.org/jira/browse/FLINK-8281
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 
> 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: brucewoo
>Priority: Critical
>
> {noformat}
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
> (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
> api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
> proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
> api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_151]
>   at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: 
> (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS 
> total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS 
> proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
> ~[na:1.8.0_151]
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> 

[jira] [Commented] (FLINK-3720) Add warm starts for models

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295735#comment-16295735
 ] 

ASF GitHub Bot commented on FLINK-3720:
---

Github user rawkintrevo commented on the issue:

https://github.com/apache/flink/pull/1865
  
closing this


> Add warm starts for models
> --
>
> Key: FLINK-3720
> URL: https://issues.apache.org/jira/browse/FLINK-3720
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Trevor Grant
>Assignee: Trevor Grant
>
> Add 'warm-start' to Iterative Solver. 
> - Make weight vector settable (this will allow for model saving/loading)
> - Make iterator existing weight vector if available
> - Keep track of what iteration we're on for additional partial fits in SGD 
> (and anywhere else it makes sense). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3720) Add warm starts for models

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295736#comment-16295736
 ] 

ASF GitHub Bot commented on FLINK-3720:
---

Github user rawkintrevo closed the pull request at:

https://github.com/apache/flink/pull/1865


> Add warm starts for models
> --
>
> Key: FLINK-3720
> URL: https://issues.apache.org/jira/browse/FLINK-3720
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Trevor Grant
>Assignee: Trevor Grant
>
> Add 'warm-start' to Iterative Solver. 
> - Make weight vector settable (this will allow for model saving/loading)
> - Make iterator existing weight vector if available
> - Keep track of what iteration we're on for additional partial fits in SGD 
> (and anywhere else it makes sense). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #1875: [FLINK-3742][ml][wip] Add Multilayer Perceptron

2017-12-18 Thread rawkintrevo
Github user rawkintrevo closed the pull request at:

https://github.com/apache/flink/pull/1875


---


[GitHub] flink pull request #1865: [FLINK-3720][ml][wip] Add Warm Starts for Iterativ...

2017-12-18 Thread rawkintrevo
Github user rawkintrevo closed the pull request at:

https://github.com/apache/flink/pull/1865


---


[jira] [Commented] (FLINK-3742) Add Multi Layer Perceptron Predictor

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295734#comment-16295734
 ] 

ASF GitHub Bot commented on FLINK-3742:
---

Github user rawkintrevo closed the pull request at:

https://github.com/apache/flink/pull/1875


> Add Multi Layer Perceptron Predictor
> 
>
> Key: FLINK-3742
> URL: https://issues.apache.org/jira/browse/FLINK-3742
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Trevor Grant
>Assignee: Trevor Grant
>Priority: Minor
>
> https://en.wikipedia.org/wiki/Multilayer_perceptron
> Multilayer perceptron is a simple sort of artificial neural network.  It 
> creates a directed graph in which the edges are parameter weights and nodes 
> are non-linear activation functions.  It is solved via a method known as back 
> propagation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3742) Add Multi Layer Perceptron Predictor

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295733#comment-16295733
 ] 

ASF GitHub Bot commented on FLINK-3742:
---

Github user rawkintrevo commented on the issue:

https://github.com/apache/flink/pull/1875
  
closing this


> Add Multi Layer Perceptron Predictor
> 
>
> Key: FLINK-3742
> URL: https://issues.apache.org/jira/browse/FLINK-3742
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Trevor Grant
>Assignee: Trevor Grant
>Priority: Minor
>
> https://en.wikipedia.org/wiki/Multilayer_perceptron
> Multilayer perceptron is a simple sort of artificial neural network.  It 
> creates a directed graph in which the edges are parameter weights and nodes 
> are non-linear activation functions.  It is solved via a method known as back 
> propagation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #1865: [FLINK-3720][ml][wip] Add Warm Starts for Iterative Solve...

2017-12-18 Thread rawkintrevo
Github user rawkintrevo commented on the issue:

https://github.com/apache/flink/pull/1865
  
closing this


---


[GitHub] flink issue #1875: [FLINK-3742][ml][wip] Add Multilayer Perceptron

2017-12-18 Thread rawkintrevo
Github user rawkintrevo commented on the issue:

https://github.com/apache/flink/pull/1875
  
closing this


---


[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295697#comment-16295697
 ] 

ASF GitHub Bot commented on FLINK-8116:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5121
  
Thanks @ankitiitb1069 @ggevay for the work and review.
The changes LGTM, minus my comment. I'll address my comments while merging 
this ...


> Stale comments referring to Checkpointed interface
> --
>
> Key: FLINK-8116
> URL: https://issues.apache.org/jira/browse/FLINK-8116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Reporter: Gabor Gevay
>Priority: Trivial
>  Labels: starter
> Fix For: 1.5.0
>
>
> Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by 
> the {{CheckpointedFunction}} interface.
> However, in {{SourceFunction}} there are two comments still referring to the 
> old {{Checkpointed}} interface. (The code examples there also need to be 
> modified.)
> Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
> possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5121: [FLINK-8116] Stale comments referring to Checkpointed int...

2017-12-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5121
  
Thanks @ankitiitb1069 @ggevay for the work and review.
The changes LGTM, minus my comment. I'll address my comments while merging 
this ...


---


[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295695#comment-16295695
 ] 

ASF GitHub Bot commented on FLINK-8116:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5121#discussion_r157606375
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -61,9 +61,9 @@
  *  isRunning = false;
  *  }
  *
- *  public Long snapshotState(long checkpointId, long 
checkpointTimestamp) { return count; }
+ *  public void snapshotState(FunctionSnapshotContext context) {  }
  *
- *  public void restoreState(Long state) { this.count = state; }
+ *  public void initializeState(FunctionInitializationContext context) 
{  }
--- End diff --

These methods should handle checkpointing of the count state.




> Stale comments referring to Checkpointed interface
> --
>
> Key: FLINK-8116
> URL: https://issues.apache.org/jira/browse/FLINK-8116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Reporter: Gabor Gevay
>Priority: Trivial
>  Labels: starter
> Fix For: 1.5.0
>
>
> Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by 
> the {{CheckpointedFunction}} interface.
> However, in {{SourceFunction}} there are two comments still referring to the 
> old {{Checkpointed}} interface. (The code examples there also need to be 
> modified.)
> Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
> possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295694#comment-16295694
 ] 

ASF GitHub Bot commented on FLINK-8116:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5121#discussion_r157606329
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -61,9 +61,9 @@
  *  isRunning = false;
  *  }
  *
- *  public Long snapshotState(long checkpointId, long 
checkpointTimestamp) { return count; }
+ *  public void snapshotState(FunctionSnapshotContext context) {  }
--- End diff --

These methods should handle checkpointing of the `count` state.


> Stale comments referring to Checkpointed interface
> --
>
> Key: FLINK-8116
> URL: https://issues.apache.org/jira/browse/FLINK-8116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Reporter: Gabor Gevay
>Priority: Trivial
>  Labels: starter
> Fix For: 1.5.0
>
>
> Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by 
> the {{CheckpointedFunction}} interface.
> However, in {{SourceFunction}} there are two comments still referring to the 
> old {{Checkpointed}} interface. (The code examples there also need to be 
> modified.)
> Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
> possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...

2017-12-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5121#discussion_r157606329
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -61,9 +61,9 @@
  *  isRunning = false;
  *  }
  *
- *  public Long snapshotState(long checkpointId, long 
checkpointTimestamp) { return count; }
+ *  public void snapshotState(FunctionSnapshotContext context) {  }
--- End diff --

These methods should handle checkpointing of the `count` state.


---


[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...

2017-12-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5121#discussion_r157606375
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -61,9 +61,9 @@
  *  isRunning = false;
  *  }
  *
- *  public Long snapshotState(long checkpointId, long 
checkpointTimestamp) { return count; }
+ *  public void snapshotState(FunctionSnapshotContext context) {  }
  *
- *  public void restoreState(Long state) { this.count = state; }
+ *  public void initializeState(FunctionInitializationContext context) 
{  }
--- End diff --

These methods should handle checkpointing of the count state.




---


[jira] [Comment Edited] (FLINK-7860) Support YARN proxy user in Flink (impersonation)

2017-12-18 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295656#comment-16295656
 ] 

Shuyi Chen edited comment on FLINK-7860 at 12/18/17 9:11 PM:
-

Hi [~eronwright], can you take a look at this as well?


was (Author: suez1224):
[~eronwright] can you take a look at this as well?

> Support YARN proxy user in Flink (impersonation)
> 
>
> Key: FLINK-7860
> URL: https://issues.apache.org/jira/browse/FLINK-7860
> Project: Flink
>  Issue Type: New Feature
>  Components: YARN
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8286) Investigate Flink-Yarn-Kerberos integration for flip-6

2017-12-18 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8286:
-

 Summary: Investigate Flink-Yarn-Kerberos integration for flip-6
 Key: FLINK-8286
 URL: https://issues.apache.org/jira/browse/FLINK-8286
 Project: Flink
  Issue Type: Task
  Components: Security
Reporter: Shuyi Chen
Assignee: Shuyi Chen
Priority: Blocker
 Fix For: 1.5.0


We've found some issues with the Flink-Yarn-Kerberos integration in the current 
deployment model, we will need to investigate and test it for flip-6 when it's 
ready.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7860) Support YARN proxy user in Flink (impersonation)

2017-12-18 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294694#comment-16294694
 ] 

Shuyi Chen edited comment on FLINK-7860 at 12/18/17 9:01 PM:
-

I am proposing adding the following new options:
security.kerberos.login.proxyuser.principal: the proxy user's principal
security.kerberos.login.proxyuser.keytab: the proxy user's keytab path

In the client code, it will use security.kerberos.login.principal and 
security.kerberos.login.keytab to login and impersonate the proxy user. Before 
the appMaster and container launch, in the launch context, set 
security.kerberos.login.principal to the value of 
security.kerberos.login.proxyuser.principal, set security.kerberos.login.keytab 
to the value of security.kerberos.login.proxyuser.keytab. So in the appMaster 
and container, it will always use the proxy user's credential.


was (Author: suez1224):
I am proposing adding the following new options:
security.kerberos.login.proxyuser.principal: the proxy user's principal
security.kerberos.login.proxyuser.keytab: the proxy user's keytab path

In the client code, it will use security.kerberos.login.principal and 
security.kerberos.login.keytab to login and impersonate the proxy user. Before 
the appMaster and container launch, set security.kerberos.login.principal to 
the value of security.kerberos.login.proxyuser.principal, set 
security.kerberos.login.keytab to the value of 
security.kerberos.login.proxyuser.keytab. So in the appMaster and container, it 
will always use the proxy user's credential.

> Support YARN proxy user in Flink (impersonation)
> 
>
> Key: FLINK-7860
> URL: https://issues.apache.org/jira/browse/FLINK-7860
> Project: Flink
>  Issue Type: New Feature
>  Components: YARN
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295518#comment-16295518
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8285 at 12/18/17 7:26 PM:
--

Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ ...

Yes, the {{DataStream}} API does not currently officially support a {{collect}} 
counterpart. It could make sense, though, to move that from 
{{flink-streaming-contribs}} to be supported in the {{DataStream}} API 
out-of-the-box. There is already some discussion going on in dismantling the 
{{flink-streaming-contribs}} module, so we can keep this in mind.


was (Author: tzulitai):
Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ ...

Yes, the {{DataStream}} API does not currently officially support a {{collect}} 
counterpart. It could make sense, though, to move that from 
{{flink-streaming-contribs}} to be supported in the {{DataStream}} API 
out-of-the-box. There is already some discussion going on in dismantling the 
{{flink-streaming-contribs}} module.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295518#comment-16295518
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8285:


Ah, sorry, I somehow was thinking of the {{DataSet}} API :/

Yes, the {{DataStream}} API does not currently officially support a {{collect}} 
counterpart. It could make sense, though, to move that from 
{{flink-streaming-contribs}} to be supported in the {{DataStream}} API 
out-of-the-box. There is already some discussion going on in dismantling the 
{{flink-streaming-contribs}} module.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295518#comment-16295518
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8285 at 12/18/17 7:24 PM:
--

Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ ...

Yes, the {{DataStream}} API does not currently officially support a {{collect}} 
counterpart. It could make sense, though, to move that from 
{{flink-streaming-contribs}} to be supported in the {{DataStream}} API 
out-of-the-box. There is already some discussion going on in dismantling the 
{{flink-streaming-contribs}} module.


was (Author: tzulitai):
Ah, sorry, I somehow was thinking of the {{DataSet}} API :/

Yes, the {{DataStream}} API does not currently officially support a {{collect}} 
counterpart. It could make sense, though, to move that from 
{{flink-streaming-contribs}} to be supported in the {{DataStream}} API 
out-of-the-box. There is already some discussion going on in dismantling the 
{{flink-streaming-contribs}} module.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295515#comment-16295515
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5172
  
This PR probably fixes the problem, but it would be good to address the 
deeper problem that the code is confusing.   At least we could add some 
commentary to the code.  The specific problems, in my view, are:
1. A filename is transmitted from client -> AM -> TM in the env variable 
`_KEYTAB_PATH` but the value doesn't appear to be used.   In effect it is a 
flag asserting that a keytab named `krb5.keytab` is available.  Alternatives:
  a. Use `krb5.keytab` as the value.
  b. Eliminate the env check and simply look for the file; if present, use 
it.
2. The existence of the "integration test code" has an unclear purpose.   
It mutates the Hadoop configuration, why?   Is the code active in any 
production scenario?

Note that `YarnTaskExecutorRunner` implements this in a slightly different 
way, and should be re-tested for 1.5.0 (since I don't think it is in use yet).



> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5172: [FLINK-8275] [Security] fix keytab local path in YarnTask...

2017-12-18 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5172
  
This PR probably fixes the problem, but it would be good to address the 
deeper problem that the code is confusing.   At least we could add some 
commentary to the code.  The specific problems, in my view, are:
1. A filename is transmitted from client -> AM -> TM in the env variable 
`_KEYTAB_PATH` but the value doesn't appear to be used.   In effect it is a 
flag asserting that a keytab named `krb5.keytab` is available.  Alternatives:
  a. Use `krb5.keytab` as the value.
  b. Eliminate the env check and simply look for the file; if present, use 
it.
2. The existence of the "integration test code" has an unclear purpose.   
It mutates the Hadoop configuration, why?   Is the code active in any 
production scenario?

Note that `YarnTaskExecutorRunner` implements this in a slightly different 
way, and should be re-tested for 1.5.0 (since I don't think it is in use yet).



---


[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Julio Biason (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295507#comment-16295507
 ] 

Julio Biason commented on FLINK-8285:
-

[~tzulitai] Yeah, it's not relevant, because there is no 
{{DataStream.collect()}}:

{{{
[error] tests.scala:28: value collect is not a member of 
org.apache.flink.streaming.api.scala.DataStream[org.azion.com.models.metrics.metrictuple.Metric]
[error] val output = pipeline.collect()
}}}

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Julio Biason (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295496#comment-16295496
 ] 

Julio Biason commented on FLINK-8285:
-

[~tzulitai] Actually, it is relevant: It could completely replace the section, 
because DataStream.collect() is not mentioned anywhere else in the page.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295493#comment-16295493
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5172
  
@suez1224 keep in mind, that contribution PRs should initially have one 
commit with the commit message appropriately set (the title of the PR would be 
a good commit message for your case).


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5172: [FLINK-8275] [Security] fix keytab local path in YarnTask...

2017-12-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5172
  
@suez1224 keep in mind, that contribution PRs should initially have one 
commit with the commit message appropriately set (the title of the PR would be 
a good commit message for your case).


---


[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295484#comment-16295484
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8285:


[~JBiason] just as a side note irrelevant to this JIRA, 
{{DataStream.collect()}} should also work for what you are trying to do.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Julio Biason (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295479#comment-16295479
 ] 

Julio Biason commented on FLINK-8285:
-

Well, I was expecting to use the DataStreamUtils.colect to retrieve the 
processing result in a test, starting with a localEnvironment inside the test 
and using a Seq as input; since I split the pipeline creation on its own 
function, I could throw any DataStream and check only the final result, as an 
E2E test.

But yeah, if it's using unstable package, it shouldn't even be mentioned in the 
docs.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295463#comment-16295463
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8285:


I'm not knowledgable of the full context here, but IMO maybe the docs should 
not be demonstrating utilities that is available only via the 
{{flink-streaming-contribs}} module. Code there is not really maintained and is 
considered instable.

> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Julio Biason (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Biason updated FLINK-8285:

Description: 
In the docs about 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
 it's mentioned that one could use an interator for retrieving the result of 
the stream.

But there is no mention of any external packages (as it happens with some 
examples in the metrics) and trying to use causes an error:

[error] object contrib is not a member of package org.apache.flink
[error] import org.apache.flink.contrib.streaming.DataStreamUtils

The line in question (as copied'n'pasted directly from the examples):

import org.apache.flink.contrib.streaming.DataStreamUtils

(PS: Source is in Scala, not Java)

  was:
In the docs about 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
 it's mentioned that one could use an interator for retrieving the result of 
the stream.

But there is no mention of any external packages (as it happens with some 
examples in the metrics) and trying to use causes an error:

[error] object contrib is not a member of package org.apache.flink
[error] import org.apache.flink.contrib.streaming.DataStreamUtils

The line in question (as copied'n'pasted directly from the examples):

import org.apache.flink.contrib.streaming.DataStreamUtils


> Iterator Data Sink doesn't mention required module
> --
>
> Key: FLINK-8285
> URL: https://issues.apache.org/jira/browse/FLINK-8285
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
> Environment: Linux CentOS/7
>Reporter: Julio Biason
>
> In the docs about 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
>  it's mentioned that one could use an interator for retrieving the result of 
> the stream.
> But there is no mention of any external packages (as it happens with some 
> examples in the metrics) and trying to use causes an error:
> [error] object contrib is not a member of package org.apache.flink
> [error] import org.apache.flink.contrib.streaming.DataStreamUtils
> The line in question (as copied'n'pasted directly from the examples):
> import org.apache.flink.contrib.streaming.DataStreamUtils
> (PS: Source is in Scala, not Java)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8285) Iterator Data Sink doesn't mention required module

2017-12-18 Thread Julio Biason (JIRA)
Julio Biason created FLINK-8285:
---

 Summary: Iterator Data Sink doesn't mention required module
 Key: FLINK-8285
 URL: https://issues.apache.org/jira/browse/FLINK-8285
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.4.0
 Environment: Linux CentOS/7
Reporter: Julio Biason


In the docs about 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink],
 it's mentioned that one could use an interator for retrieving the result of 
the stream.

But there is no mention of any external packages (as it happens with some 
examples in the metrics) and trying to use causes an error:

[error] object contrib is not a member of package org.apache.flink
[error] import org.apache.flink.contrib.streaming.DataStreamUtils

The line in question (as copied'n'pasted directly from the examples):

import org.apache.flink.contrib.streaming.DataStreamUtils



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295353#comment-16295353
 ] 

Timo Walther commented on FLINK-8240:
-

Hi everyone,

I think we don't need a design document for it but it would be great to hear 
some opinions. I introduced descriptors that allow to describe connectors, 
encoding, and time attributes. 

My current API design looks like:

{code}
tableEnv
  .from(
FileSystem()
  .path("/path/to/csv"))
  .withEncoding(
CSV()
  .field("myfield", Types.STRING)
  .field("myfield2", Types.INT)
  .quoteCharacter(';')
  .fieldDelimiter("#")
  .lineDelimiter("\r\n")
  .commentPrefix("%%")
  .ignoreFirstLine()
  .ignoreParseErrors())
  .withRowtime(
Rowtime()
  .onField("rowtime")
  .withTimestampFromDataStream()
  .withWatermarkFromDataStream())
  .withProctime(
Proctime()
  .onField("myproctime"))
  .toTableSource()
{code}

These descriptors are converted into pure key-value properties. Such as:

{code}
"connector.filesystem.path" -> "/myfile"
"encoding.csv.fields.0.name" -> "field1",
"encoding.csv.fields.0.type" -> "STRING",
"encoding.csv.fields.1.name" -> "field2",
"encoding.csv.fields.1.type" -> "TIMESTAMP",
"encoding.csv.fields.2.name" -> "field3",
"encoding.csv.fields.2.type" -> "ANY(java.lang.Class)",
"encoding.csv.fields.3.name" -> "field4",
"encoding.csv.fields.3.type" -> "ROW(test INT, row VARCHAR)",
"encoding.csv.line-delimiter" -> "^"
{code}

The properties are fully expressed as strings. This allows to save them also in 
configuration files. Which might be interesting for FLINK-7594.

The question is how do we want to translate the properties into actual table 
sources. Or more precisely: How do we want to supply converters? Should they be 
part of the {{TableSource}} interface? Or should table sources be annotated 
with some factory class? Right now we have a similar functionality for external 
catalogs but this is too specific and does not consider encodings or time 
attributes. Furthermore, it would be better to use Java {{ServiceLoader}}s 
instead of classpath scanning. This is also used for Flink's file systems.

So my idea would be to have a class {{TableFactory}} that declares a connector 
e.g. "kafka_0.10" and supported encodings "csv", "avro" (similar to 
FLINK-7643). All built-in table sources need to provide such a factory.

What do you think? [~fhueske] [~jark] [~wheat9] [~ykt836]


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5176: [FLINK-8279][blob] fall back to TaskManager temp director...

2017-12-18 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5176
  
@NicoK was this PR a response to an actual issue, and did it resolve it?

Do I understand correctly that this will cause the JM (blob server) to use 
`taskmanager.tmp.dirs`?   In addition to the TM blob cache, of course.

For the TM on YARN, is there really any effect?   Looking at 
MAPREDUCE-6472, seems the temp folder is already within the container.   Can 
you explain the actual effect?  Thanks.

Note that the Mesos implementation doesn't actually configure 
`taskmanager.tmp.dirs` at this time (though there's some dead code in TM).  A 
proper treatment of tmp folders on Mesos would involve the use of volumes.



---


[jira] [Commented] (FLINK-8279) Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp directories

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295317#comment-16295317
 ] 

ASF GitHub Bot commented on FLINK-8279:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5176
  
@NicoK was this PR a response to an actual issue, and did it resolve it?

Do I understand correctly that this will cause the JM (blob server) to use 
`taskmanager.tmp.dirs`?   In addition to the TM blob cache, of course.

For the TM on YARN, is there really any effect?   Looking at 
MAPREDUCE-6472, seems the temp folder is already within the container.   Can 
you explain the actual effect?  Thanks.

Note that the Mesos implementation doesn't actually configure 
`taskmanager.tmp.dirs` at this time (though there's some dead code in TM).  A 
proper treatment of tmp folders on Mesos would involve the use of volumes.



> Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp 
> directories
> -
>
> Key: FLINK-8279
> URL: https://issues.apache.org/jira/browse/FLINK-8279
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the BLOB server and cache processes (temporarily) stash incoming 
> files into their local file system in the directory given by the 
> {{blob.storage.directory}} configuration property. If this property is not 
> set or empty, it will fall back to {{java.io.tmpdir}}.
> Instead, in a Mesos/YARN environment, we could use the temporary directories 
> they assigned to the Flink job which are not only the proper folder to use, 
> but may also offer some more space.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6065) Make TransportClient for ES5 pluggable

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295316#comment-16295316
 ] 

ASF GitHub Bot commented on FLINK-6065:
---

Github user sschaef closed the pull request at:

https://github.com/apache/flink/pull/3934


> Make TransportClient for ES5 pluggable
> --
>
> Key: FLINK-6065
> URL: https://issues.apache.org/jira/browse/FLINK-6065
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector, Streaming Connectors
>Reporter: Robert Metzger
>
> This JIRA is based on a user request: 
> http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454
> Currently, in the {{Elasticsearch5ApiCallBridge}} the 
> {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this 
> client pluggable to allow using other clients such as the 
> {{PreBuiltXPackTransportClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3934: [FLINK-6065] Add initClient method to Elasticsearc...

2017-12-18 Thread sschaef
Github user sschaef closed the pull request at:

https://github.com/apache/flink/pull/3934


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295303#comment-16295303
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157544965
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -161,6 +172,29 @@ public boolean isReleased() {
return isReleased;
}
 
+   @Override
+   public int getBuffersInBacklog() {
+   return buffersInBacklog;
+   }
+
+   @Override
+   public void decreaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog--;
+   }
+   }
+
+   @Override
+   public void increaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog++;
+   }
+   }
--- End diff --

please check the access-level (the latter two could be private)


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295299#comment-16295299
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157545208
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -237,6 +243,29 @@ public boolean isReleased() {
return isReleased;
}
 
+   @Override
+   public int getBuffersInBacklog() {
+   return buffersInBacklog;
+   }
+
+   @Override
+   public void decreaseBuffersInBacklog(Buffer buffer) {
+   if (buffer != null && buffer.isBuffer()) {
+   synchronized (buffers) {
+   buffersInBacklog--;
+   }
+   }
+   }
+
+   @Override
+   public void increaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog++;
+   }
+   }
--- End diff --

please check the access-level (the latter two could be private)


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295305#comment-16295305
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157541024
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception {
try {
subpartition.release();
 
+   assertEquals(0, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(0, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

same here - please test with a real `Buffer` instance


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295300#comment-16295300
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157538818
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

good, can you also add the backlog correctness checks to the 
`reader.getNextBuffer()` lines below to ensure they are correct after taking 
buffers out?


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295307#comment-16295307
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157548033
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

I shortly thought about relying on `buffers.size()` here to reduce 
complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may 
show some race conditions then without synchronisation. However, if we picked 
up the idea again of returning the backlog size with the buffer itself (which 
is retrieved under the lock), i.e. similar to `BufferAndAvailability` being 
returned by the `SequenceNumberingViewReader`, this would work and we would not 
need the `volatile` here. Since you split the implementations into 
`PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a 
viable approach again.
What do you think? What would you prefer?


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295301#comment-16295301
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157539147
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -239,6 +261,10 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 
// Spill now
assertEquals(2, partition.releaseMemory());
+   // still same statistics:
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(2, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

same here - please add the checks to the `reader.getNextBuffer()` lines 
below


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295306#comment-16295306
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157540910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

Actually, this never increases the backlog, even if the subpartition is not 
finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. 
Can you test with a real `Buffer` instead?


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295304#comment-16295304
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157544794
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

I'm not quite sure the latter two methods should be in `ResultSubpartition` 
now since they are quite internal. `increaseBuffersInBacklog()` is only called 
by `PipelinedSubpartition` and `SpillableSubpartition`. 
`decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable 
subpartition views and therefore could be package-private in 
`SpillableSubpartition` only.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295308#comment-16295308
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157548895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -77,6 +78,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

If the interface of `getNextBuffer()` was changed as suggested above, we 
could remove the `volatile` here as well.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295302#comment-16295302
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157538061
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
// Add data to the queue...
subpartition.add(createBuffer());
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+
// ...should have resulted in a notification
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
assertNotNull(view.getNextBuffer());
assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
 
// Add data to the queue...
subpartition.add(createBuffer());
+
+   assertEquals(2, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
+
+   // Add event to the queue...
+   Buffer event = createBuffer();
+   event.tagAsEvent();
+   subpartition.add(event);
+
+   assertEquals(3, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
--- End diff --

good catch - the event-adding path was not tested yet


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157544965
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -161,6 +172,29 @@ public boolean isReleased() {
return isReleased;
}
 
+   @Override
+   public int getBuffersInBacklog() {
+   return buffersInBacklog;
+   }
+
+   @Override
+   public void decreaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog--;
+   }
+   }
+
+   @Override
+   public void increaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog++;
+   }
+   }
--- End diff --

please check the access-level (the latter two could be private)


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157548033
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

I shortly thought about relying on `buffers.size()` here to reduce 
complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may 
show some race conditions then without synchronisation. However, if we picked 
up the idea again of returning the backlog size with the buffer itself (which 
is retrieved under the lock), i.e. similar to `BufferAndAvailability` being 
returned by the `SequenceNumberingViewReader`, this would work and we would not 
need the `volatile` here. Since you split the implementations into 
`PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a 
viable approach again.
What do you think? What would you prefer?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157540910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

Actually, this never increases the backlog, even if the subpartition is not 
finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. 
Can you test with a real `Buffer` instead?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157539147
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -239,6 +261,10 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 
// Spill now
assertEquals(2, partition.releaseMemory());
+   // still same statistics:
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(2, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

same here - please add the checks to the `reader.getNextBuffer()` lines 
below


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157538061
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
// Add data to the queue...
subpartition.add(createBuffer());
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+
// ...should have resulted in a notification
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
assertNotNull(view.getNextBuffer());
assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
 
// Add data to the queue...
subpartition.add(createBuffer());
+
+   assertEquals(2, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
+
+   // Add event to the queue...
+   Buffer event = createBuffer();
+   event.tagAsEvent();
+   subpartition.add(event);
+
+   assertEquals(3, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
--- End diff --

good catch - the event-adding path was not tested yet


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157548895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -77,6 +78,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

If the interface of `getNextBuffer()` was changed as suggested above, we 
could remove the `volatile` here as well.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157541024
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception {
try {
subpartition.release();
 
+   assertEquals(0, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(0, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

same here - please test with a real `Buffer` instance


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157545208
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -237,6 +243,29 @@ public boolean isReleased() {
return isReleased;
}
 
+   @Override
+   public int getBuffersInBacklog() {
+   return buffersInBacklog;
+   }
+
+   @Override
+   public void decreaseBuffersInBacklog(Buffer buffer) {
+   if (buffer != null && buffer.isBuffer()) {
+   synchronized (buffers) {
+   buffersInBacklog--;
+   }
+   }
+   }
+
+   @Override
+   public void increaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog++;
+   }
+   }
--- End diff --

please check the access-level (the latter two could be private)


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157538818
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

good, can you also add the backlog correctness checks to the 
`reader.getNextBuffer()` lines below to ensure they are correct after taking 
buffers out?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157544794
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

I'm not quite sure the latter two methods should be in `ResultSubpartition` 
now since they are quite internal. `increaseBuffersInBacklog()` is only called 
by `PipelinedSubpartition` and `SpillableSubpartition`. 
`decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable 
subpartition views and therefore could be package-private in 
`SpillableSubpartition` only.


---


[jira] [Commented] (FLINK-8279) Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp directories

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295290#comment-16295290
 ] 

ASF GitHub Bot commented on FLINK-8279:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5176#discussion_r157548338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -127,21 +132,31 @@ private static BlobStoreService 
createFileSystemBlobStore(Configuration configur
}
 
/**
-* Creates a local storage directory for a blob service under the given 
parent directory.
+* Creates a local storage directory for a blob service under the 
configuration parameter given
+* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is 
null or empty, we will
+* fall back to the TaskManager temp directories (given by
+* {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn 
falls back to
+* {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set 
to
+* java.io.tmpdir) and choose one among them at random.
 *
-* @param basePath
-*  base path, i.e. parent directory, of the storage 
directory to use (if null or
-*  empty, the path in java.io.tmpdir will be used)
+* @param config
+*  Flink configuration
 *
 * @return a new local storage directory
 *
 * @throws IOException
 *  thrown if the local file storage cannot be created or 
is not usable
 */
-   static File initLocalStorageDirectory(String basePath) throws 
IOException {
+   static File initLocalStorageDirectory(Configuration config) throws 
IOException {
+
+   String basePath = 
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
+
File baseDir;
if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
-   baseDir = new 
File(System.getProperty("java.io.tmpdir"));
+   final String[] tmpDirPaths = config.getString(
+   ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+   
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
--- End diff --

Consider encapsulating this parsing logic into 
`TaskManagerServicesConfiguration` or similar.


> Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp 
> directories
> -
>
> Key: FLINK-8279
> URL: https://issues.apache.org/jira/browse/FLINK-8279
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the BLOB server and cache processes (temporarily) stash incoming 
> files into their local file system in the directory given by the 
> {{blob.storage.directory}} configuration property. If this property is not 
> set or empty, it will fall back to {{java.io.tmpdir}}.
> Instead, in a Mesos/YARN environment, we could use the temporary directories 
> they assigned to the Flink job which are not only the proper folder to use, 
> but may also offer some more space.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5176: [FLINK-8279][blob] fall back to TaskManager temp d...

2017-12-18 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5176#discussion_r157548338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -127,21 +132,31 @@ private static BlobStoreService 
createFileSystemBlobStore(Configuration configur
}
 
/**
-* Creates a local storage directory for a blob service under the given 
parent directory.
+* Creates a local storage directory for a blob service under the 
configuration parameter given
+* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is 
null or empty, we will
+* fall back to the TaskManager temp directories (given by
+* {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn 
falls back to
+* {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set 
to
+* java.io.tmpdir) and choose one among them at random.
 *
-* @param basePath
-*  base path, i.e. parent directory, of the storage 
directory to use (if null or
-*  empty, the path in java.io.tmpdir will be used)
+* @param config
+*  Flink configuration
 *
 * @return a new local storage directory
 *
 * @throws IOException
 *  thrown if the local file storage cannot be created or 
is not usable
 */
-   static File initLocalStorageDirectory(String basePath) throws 
IOException {
+   static File initLocalStorageDirectory(Configuration config) throws 
IOException {
+
+   String basePath = 
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
+
File baseDir;
if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
-   baseDir = new 
File(System.getProperty("java.io.tmpdir"));
+   final String[] tmpDirPaths = config.getString(
+   ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+   
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
--- End diff --

Consider encapsulating this parsing logic into 
`TaskManagerServicesConfiguration` or similar.


---


[jira] [Updated] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-18 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8281:

Description: 
{noformat}
org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
(TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
 ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[na:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[na:1.8.0_151]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[na:1.8.0_151]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator 
window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) 
AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
 ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
... 5 common frames omitted
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
~[na:1.8.0_151]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[na:1.8.0_151]
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
 ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
... 5 common frames omitted
Suppressed: java.lang.Exception: Could not properly cancel managed 
keyed state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
 ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
 ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
 ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
... 5 common frames omitted
Caused by: java.util.concurrent.ExecutionException: 
java.io.IOException: Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
... 7 common frames omitted
Caused by: java.io.IOException: Could not open output stream for state 
backend
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:371)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:228)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:212)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:236)
at 

[jira] [Assigned] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-18 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-8283:
--

Assignee: Tzu-Li (Gordon) Tai

> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=131}=-915623761775, 
> 

[jira] [Created] (FLINK-8284) Custom metrics not being exposed for Prometheus

2017-12-18 Thread Julio Biason (JIRA)
Julio Biason created FLINK-8284:
---

 Summary: Custom metrics not being exposed for Prometheus
 Key: FLINK-8284
 URL: https://issues.apache.org/jira/browse/FLINK-8284
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Metrics
Affects Versions: 1.4.0
 Environment: Linux/CentOS 7
Reporter: Julio Biason


Following the documentation, we changed our filter that removes events with 
missing fields to a RichFilterFunction, so we can capture metrics about such 
events:

{{public class MissingClientFilter extends RichFilterFunction {

private transient Counter counter;

@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.addGroup("events")
.counter("missingClient");
}

@Override
public boolean filter(LineData line) {
String client = line.get("client").toString();
boolean missing = client.trim().equals("");
if (!missing) {
this.count();
}
return !missing;
}

private void count() {
if (this.counter != null) {
this.counter.inc();
}
}
}}}

We also added Prometheus as our reporter:

{{metrics.reporters: prom
metrics.reporter.prom.port: 9105
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter}}

The problem is accessing port 9105 display all Flink metrics, but not ours.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-18 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8283:
---
Labels: test-stability  (was: )

> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=131}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=761}=-915623761775, 
> 

[jira] [Created] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-18 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8283:
--

 Summary: FlinkKafkaConsumerBase failing on Travis with no output 
in 10min
 Key: FLINK-8283
 URL: https://issues.apache.org/jira/browse/FLINK-8283
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Tests
Affects Versions: 1.5.0
Reporter: Nico Kruber
Priority: Critical


Since a few days, Travis builds with the {{connectors}} profile keep failing 
more often with no new output being received within 10 minutes. It seems to 
start with the Travis build for 
https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 
but may have been introduced earlier. The printed offsets look strange though.

{code}
16:33:12,508 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
restore state in the FlinkKafkaConsumer: 
{KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}

16:33:12,520 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
subtask 2 will start reading 66 partitions with offsets in restored state: 
{KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=131}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=761}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=506}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=251}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=116}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=176}=-915623761775, 
KafkaTopicPartition{topic='test-topic', partition=941}=-915623761775, 
KafkaTopicPartition{topic='test-topic', 

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2017-12-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295207#comment-16295207
 ] 

Timo Walther commented on FLINK-8282:
-

I don't know if this is needed. Actually, the {{transform()}} method is rather 
internal. [~aljoscha] what is your opinion here?

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement1(StreamRecord 
> record) throws Exception {
>   }
>   @Override
>   public void 

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2017-12-18 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295193#comment-16295193
 ] 

Xingcan Cui commented on FLINK-8282:


Hi [~twalthr], thanks for this ticket. I just wonder whether we could create an 
anonymous {{TwoInputStreamOperator}} directly since the Javadoc declares that 
to create a custom operator, we should use {{AbstractStreamOperator}} as a base 
class.

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement1(StreamRecord 
> record) throws 

[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295132#comment-16295132
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5168


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5168


---


[jira] [Created] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2017-12-18 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8282:
---

 Summary: Transformation with TwoInputStreamOperator fails
 Key: FLINK-8282
 URL: https://issues.apache.org/jira/browse/FLINK-8282
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Timo Walther


The following program fails because of multiple reasons (see exceptions below). 
The transformation with a {{TwoInputStreamOperator}} does not extend 
{{AbstractStreamOperator}}. I think this is the main cause why it fails. Either 
we fix the exceptions or we check for {{AbstractStreamOperator}} first.

{code}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream ds1 = env.addSource(new 
SourceFunction() {
@Override
public void run(SourceContext ctx) throws 
Exception {
ctx.emitWatermark(new Watermark(100L));
ctx.collect(12);
while (true) Thread.yield();
}

@Override
public void cancel() {

}
});

DataStream ds2 = env.addSource(new 
SourceFunction() {
@Override
public void run(SourceContext ctx) throws 
Exception {
ctx.emitWatermark(new Watermark(200L));
ctx.collect(12);
while (true) Thread.yield();
}

@Override
public void cancel() {

}
});

ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
TwoInputStreamOperator() {
@Override
public void processElement1(StreamRecord 
element) throws Exception {
System.out.println();
}

@Override
public void processElement2(StreamRecord 
element) throws Exception {
System.out.println();
}

@Override
public void processWatermark1(Watermark mark) throws 
Exception {
System.out.println();
}

@Override
public void processWatermark2(Watermark mark) throws 
Exception {
System.out.println();
}

@Override
public void processLatencyMarker1(LatencyMarker 
latencyMarker) throws Exception {

}

@Override
public void processLatencyMarker2(LatencyMarker 
latencyMarker) throws Exception {

}

@Override
public void setup(StreamTask containingTask, 
StreamConfig config, Output output) {

}

@Override
public void open() throws Exception {

}

@Override
public void close() throws Exception {

}

@Override
public void dispose() throws Exception {

}

@Override
public OperatorSnapshotResult snapshotState(long 
checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
Exception {
return null;
}

@Override
public void initializeState(OperatorSubtaskState 
stateHandles) throws Exception {

}

@Override
public void notifyOfCompletedCheckpoint(long 
checkpointId) throws Exception {

}

@Override
public void setKeyContextElement1(StreamRecord 
record) throws Exception {

}

@Override
public void setKeyContextElement2(StreamRecord 
record) throws Exception {

}

@Override
public ChainingStrategy getChainingStrategy() {
return null;
}

@Override
public void setChainingStrategy(ChainingStrategy 
strategy) {

[jira] [Updated] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2017-12-18 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8282:

Affects Version/s: 1.4.0

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement1(StreamRecord 
> record) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement2(StreamRecord 
> record) throws Exception {
>   }
>   @Override
>   

[jira] [Commented] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295128#comment-16295128
 ] 

ASF GitHub Bot commented on FLINK-7495:
---

Github user PedroMrChaves commented on the issue:

https://github.com/apache/flink/pull/4621
  
This change needs to also be added to previous versions. I'm using version 
1.3.2 and realised that the initializeState(..) function was not being called. 


> AbstractUdfStreamOperator#initializeState() should be called in 
> AsyncWaitOperator#initializeState()
> ---
>
> Key: FLINK-7495
> URL: https://issues.apache.org/jira/browse/FLINK-7495
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: Fang Yong
>Priority: Minor
> Fix For: 1.4.0
>
>
> {code}
> recoveredStreamElements = context
>   .getOperatorStateStore()
>   .getListState(new ListStateDescriptor<>(STATE_NAME, 
> inStreamElementSerializer));
> {code}
> Call to AbstractUdfStreamOperator#initializeState() should be added in the 
> beginning



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4621: [FLINK-7495] Call to AbstractUdfStreamOperator#initialize...

2017-12-18 Thread PedroMrChaves
Github user PedroMrChaves commented on the issue:

https://github.com/apache/flink/pull/4621
  
This change needs to also be added to previous versions. I'm using version 
1.3.2 and realised that the initializeState(..) function was not being called. 


---


[jira] [Commented] (FLINK-8278) Scala examples in Metric documentation do not compile

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295027#comment-16295027
 ] 

ASF GitHub Bot commented on FLINK-8278:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5177

[FLINK-8278] [doc] Fix the private member init problem for Scala examples 
in docs

## What is the purpose of the change

This PR fixes the improper initialization problem for Scala private members 
in docs.

## Brief change log

  - Assigns the private members with the proper value "_".

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8278

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5177.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5177


commit f51787b81a8acc7aacea1ce7cafa94ff0d3ce70f
Author: Xingcan Cui 
Date:   2017-12-18T13:55:30Z

[FLINK-8278][doc]Scala examples in Metric documentation do not compile




> Scala examples in Metric documentation do not compile
> -
>
> Key: FLINK-8278
> URL: https://issues.apache.org/jira/browse/FLINK-8278
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> The Scala examples in the [Metrics 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html]
>  do not compile.
> The line 
> {code}
> @transient private var counter: Counter
> {code}
> needs to be extended to
> {code}
> @transient private var counter: Counter = _
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5177: [FLINK-8278] [doc] Fix the private member init pro...

2017-12-18 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5177

[FLINK-8278] [doc] Fix the private member init problem for Scala examples 
in docs

## What is the purpose of the change

This PR fixes the improper initialization problem for Scala private members 
in docs.

## Brief change log

  - Assigns the private members with the proper value "_".

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8278

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5177.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5177


commit f51787b81a8acc7aacea1ce7cafa94ff0d3ce70f
Author: Xingcan Cui 
Date:   2017-12-18T13:55:30Z

[FLINK-8278][doc]Scala examples in Metric documentation do not compile




---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295011#comment-16295011
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157489280
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+   final Either 
jobExecutionResult =
+   jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(final 
JobID jobId, final Time timeout) {
+   final boolean jobExecutionResultPresent = 
jobExecutionResultCache.contains(jobId);
+   if (!jobManagerRunners.containsKey(jobId) && 
!jobExecutionResultPresent) {
+   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   }
+   return 
CompletableFuture.completedFuture(jobExecutionResultPresent);
--- End diff --

But this would never return a future containing `false`. 


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >