[jira] [Assigned] (FLINK-4207) WindowOperator becomes very slow with allowed lateness

2016-07-14 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-4207:
-

Assignee: Kostas Kloudas

> WindowOperator becomes very slow with allowed lateness
> --
>
> Key: FLINK-4207
> URL: https://issues.apache.org/jira/browse/FLINK-4207
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window 
> emits) becomes very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.setParallelism(1);
>   env.addSource(new InfiniteTupleSource(100_000))
>   .keyBy(0)
>   .timeWindow(Time.seconds(3))
>   .allowedLateness(Time.seconds(1))
>   .reduce(new ReduceFunction Integer>>() {
>   @Override
>   public Tuple2 
> reduce(Tuple2 value1,
>   Tuple2 
> value2) throws Exception {
>   return Tuple2.of(value1.f0, 
> value1.f1 + value2.f1);
>   }
>   })
>   .filter(new FilterFunction Integer>>() {
>   private static final long 
> serialVersionUID = 1L;
>   @Override
>   public boolean filter(Tuple2 Integer> value) throws Exception {
>   return 
> value.f0.startsWith("Tuple 0");
>   }
>   })
>   .print();
>   // execute program
>   env.execute("WindowWordCount");
>   }
>   public static class InfiniteTupleSource implements 
> ParallelSourceFunction> {
>   private static final long serialVersionUID = 1L;
>   private int numGroups;
>   public InfiniteTupleSource(int numGroups) {
>   this.numGroups = numGroups;
>   }
>   @Override
>   public void run(SourceContext> out) 
> throws Exception {
>   long index = 0;
>   while (true) {
>   Tuple2 tuple = new 
> Tuple2<>("Tuple " + (index % numGroups), 1);
>   out.collect(tuple);
>   index++;
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called

2016-07-14 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-4194:
--

Assignee: Tzu-Li (Gordon) Tai

> KinesisDeserializationSchema.isEndOfStream() is never called
> 
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called

2016-07-14 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378829#comment-15378829
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4194:


Just curious, was there any obstacles for the {{FlinkKafkaConsumer08}} so that 
it had to be implemented differently?


> KinesisDeserializationSchema.isEndOfStream() is never called
> 
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378685#comment-15378685
 ] 

ASF GitHub Bot commented on FLINK-1502:
---

Github user sumitchawla commented on the issue:

https://github.com/apache/flink/pull/1947
  
hi @zentol .. thanks for the information.  This is great and i could see 
the metrics in JMX.  I have one more question on interoperability of 
Accumulators and Metrics.  As per my understanding, currently Metrics are only 
available at system level , and User Accumulators are available to Job writers. 
 Is there any plan for supporting Metrics for Job writers? Metrics give much 
more capabilities than current Accumulators and would be a great way to track 
custom metrics at each operator level? 

  


> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1947: [FLINK-1502] [WIP] Basic Metric System

2016-07-14 Thread sumitchawla
Github user sumitchawla commented on the issue:

https://github.com/apache/flink/pull/1947
  
hi @zentol .. thanks for the information.  This is great and i could see 
the metrics in JMX.  I have one more question on interoperability of 
Accumulators and Metrics.  As per my understanding, currently Metrics are only 
available at system level , and User Accumulators are available to Job writers. 
 Is there any plan for supporting Metrics for Job writers? Metrics give much 
more capabilities than current Accumulators and would be a great way to track 
custom metrics at each operator level? 

  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3630) Little mistake in documentation

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378657#comment-15378657
 ] 

ASF GitHub Bot commented on FLINK-3630:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2254

[FLINK-3630] [docs] Little mistake in documentation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3630_little_mistake_in_documentation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2254


commit c659b51803545f067192de2760e0d7df958ef236
Author: Greg Hogan 
Date:   2016-07-14T18:42:55Z

[FLINK-3630] [docs] Little mistake in documentation




> Little mistake in documentation
> ---
>
> Key: FLINK-3630
> URL: https://issues.apache.org/jira/browse/FLINK-3630
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Documentation
>Affects Versions: 1.0.0
>Reporter: Riccardo Diomedi
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: documentation
>
> in section "GroupCombine on a Grouped DataSet" of the following link: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#groupreduce-on-grouped-dataset
> there is a little mistake in java code in both combine and reduce method(it's 
> the same mistake). The variable "word" is defined in the scope of the for 
> loop so it cannot be used in collect method.
> Possible solution could be to initialise the variable before the for and 
> assign a value inside the for.
> Something like:
> int count = 0;
> String word;
> for (String record : words) {
> word = record;
> count++;
> }
> out.collect(new Tuple2(word, count)); 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2254: [FLINK-3630] [docs] Little mistake in documentatio...

2016-07-14 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2254

[FLINK-3630] [docs] Little mistake in documentation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3630_little_mistake_in_documentation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2254


commit c659b51803545f067192de2760e0d7df958ef236
Author: Greg Hogan 
Date:   2016-07-14T18:42:55Z

[FLINK-3630] [docs] Little mistake in documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4219) Quote PDSH opts in start-cluster.sh

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378654#comment-15378654
 ] 

ASF GitHub Bot commented on FLINK-4219:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2253

[FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh

This prevents word splitting if the user configures multiple SSH options.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4219_quote_pdsh_opts_in_startclustersh

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2253.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2253


commit 2a71c1cd12186f28ab2e17aebb2b6f8a5935a19e
Author: Greg Hogan 
Date:   2016-07-14T17:40:35Z

[FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh

This prevents word splitting if the user configures multiple SSH
options.




> Quote PDSH opts in start-cluster.sh
> ---
>
> Key: FLINK-4219
> URL: https://issues.apache.org/jira/browse/FLINK-4219
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Quote {{PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS}} in {{start-cluster.sh}} to 
> prevent word splitting if the user configures multiple SSH options.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2253: [FLINK-4219] [scripts] Quote PDSH opts in start-cl...

2016-07-14 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2253

[FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh

This prevents word splitting if the user configures multiple SSH options.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4219_quote_pdsh_opts_in_startclustersh

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2253.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2253


commit 2a71c1cd12186f28ab2e17aebb2b6f8a5935a19e
Author: Greg Hogan 
Date:   2016-07-14T17:40:35Z

[FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh

This prevents word splitting if the user configures multiple SSH
options.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70887209
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -72,26 +68,36 @@
  * This class is the executable entry point for the YARN application 
master.
  * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
  * and {@link YarnFlinkResourceManager}.
- * 
+ * 
--- End diff --

sorry for that one, i accidentally hit the AF and noticed it too late.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70887016
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws 
Exception {
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
+   final ApplicationId appId = appContext.getApplicationId();
+
+   // -- Add Zookeeper namespace to local 
flinkConfiguraton --
+   String zkNamespace = getZookeeperNamespace();
+   // no user specified cli argument for namespace?
+   if(zkNamespace == null || zkNamespace.isEmpty()) {
+   // namespace defined in config? else use applicationId 
as default.
+   zkNamespace = 
flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
String.valueOf(appId));
+   setZookeeperNamespace(zkNamespace);
+   }
+
+   
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
--- End diff --

simply moving this to the else block makes the code incorrect. i preferred 
the chance of overwriting a value with itself once in non performance critical 
code over bloating up the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70886483
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws 
Exception {
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
+   final ApplicationId appId = appContext.getApplicationId();
--- End diff --

this line was only moved for my change. it is in fact used further down in 
the same method and cannot be moved inside the if.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2249
  
Please make sure to follow the naming conventions for pull requests, as 
described in the PR template. First entry under "General" : "The pull request 
references the related JIRA issue ("[FLINK-XXX] Jira title text")"

I can't speak for whether the implementation is correct, but the changes 
made don't appear to be covered by tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70885172
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws 
Exception {
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
+   final ApplicationId appId = appContext.getApplicationId();
--- End diff --

since this is only used in the `if` block it should be moved in there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70884431
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -72,26 +68,36 @@
  * This class is the executable entry point for the YARN application 
master.
  * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
  * and {@link YarnFlinkResourceManager}.
- * 
+ * 
--- End diff --

Of the 96 additions made a grand total of 6 are actually related to this 
PR. Please limit your auto-formatting. I'm not a fan of checking 15x the number 
of lines necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70883754
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws 
Exception {
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
+   final ApplicationId appId = appContext.getApplicationId();
+
+   // -- Add Zookeeper namespace to local 
flinkConfiguraton --
+   String zkNamespace = getZookeeperNamespace();
+   // no user specified cli argument for namespace?
+   if(zkNamespace == null || zkNamespace.isEmpty()) {
+   // namespace defined in config? else use applicationId 
as default.
+   zkNamespace = 
flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
String.valueOf(appId));
+   setZookeeperNamespace(zkNamespace);
+   }
+
+   
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
--- End diff --

this should be in an `else` block, otherwise we may set a property even 
though it is already set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70883320
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws 
Exception {
// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
+   final ApplicationId appId = appContext.getApplicationId();
+
+   // -- Add Zookeeper namespace to local 
flinkConfiguraton --
+   String zkNamespace = getZookeeperNamespace();
+   // no user specified cli argument for namespace?
+   if(zkNamespace == null || zkNamespace.isEmpty()) {
--- End diff --

mising space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70883081
  
--- Diff: docs/setup/jobmanager_high_availability.md ---
@@ -74,11 +74,15 @@ In order to start an HA-cluster add the following 
configuration keys to `conf/fl
 
   Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which 
all required coordination data is placed.
+- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which 
all cluster namespace nodes are placed.
 
-  recovery.zookeeper.path.root: /flink # important: customize per 
cluster
+  recovery.zookeeper.path.root: /flink
 
-  **Important**: if you are running multiple Flink HA clusters, you have 
to manually configure separate root nodes for each cluster.
+- **ZooKeeper namespace** (recommended): The *namespace ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+
+  recovery.zookeeper.path.namespace: /default_ns # important: 
customize per cluster 
+
+  **Important**: if you are running multiple Flink HA clusters, you have 
to manually configure separate namespaces for each cluster. By default, Yarn 
cluster and Yarn session automatically generate namespaces based on Yarn 
application id. A manual configuration overrides this behaviour in Yarn. 
Specifying a namespace with the -z CLI option, in turn, overrides manual 
configuration. 
--- End diff --

i believe you are missing a "the" before "Yarn cluster" and "Yarn 
application id"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70882042
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -428,6 +429,12 @@ private static Configuration 
createConfiguration(String baseDirectory, Map

[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70881905
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -475,23 +484,23 @@ public YarnClusterClient retrieveCluster(
CommandLine cmdLine,
Configuration config) throws 
UnsupportedOperationException {
 
-   // first check for an application id
-   if (cmdLine.hasOption(APPLICATION_ID.getOpt())) {
-   String applicationID = 
cmdLine.getOptionValue(APPLICATION_ID.getOpt());
+   // first check for an application id, then try to load from 
yarn properties
+   String applicationID = 
cmdLine.hasOption(APPLICATION_ID.getOpt()) ?
+   cmdLine.getOptionValue(APPLICATION_ID.getOpt())
+   : loadYarnPropertiesFile(cmdLine, config);
+
+   if(null != applicationID) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378354#comment-15378354
 ] 

ASF GitHub Bot commented on FLINK-4208:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2239
  
Using `wait $mypid` or just `wait` works for me if I `./bin/jobmanager.sh 
start cluster` (jobmanager starts in foreground), then in another terminal 
`./bin/jobmanager.sh stop` and both terminals are now at the command prompt.


> Support Running Flink processes in foreground mode
> --
>
> Key: FLINK-4208
> URL: https://issues.apache.org/jira/browse/FLINK-4208
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Flink clusters are started automatically in daemon mode, this is definitely 
> the default case, however if we want to start containers based on flinks, the 
> execution context gets lost. Running flink as foreground processes can fix 
> this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2239: [FLINK-4208] Support Running Flink processes in foregroun...

2016-07-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2239
  
Using `wait $mypid` or just `wait` works for me if I `./bin/jobmanager.sh 
start cluster` (jobmanager starts in foreground), then in another terminal 
`./bin/jobmanager.sh stop` and both terminals are now at the command prompt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4210) Move close()/isClosed() out of MetricGroup interface

2016-07-14 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378352#comment-15378352
 ] 

Chesnay Schepler commented on FLINK-4210:
-

This is actually wrong. Let's say a user calls close() on the MetricGorup 
returned by the RuntimeContext. Then all operator metrics for that operator are 
turned off, not just user-metrics.

Besides that, I'm mostly concerned about usability. Having a close() method in 
the interface implicitly conveys "hey, this object has to be closed". But they 
don't have to close it, and it will just make using them a bit unwieldy to 
users who believe they have to. They would have to store it in a field, close 
it on exit, check for null. All of this is unnecessary.

> Move close()/isClosed() out of MetricGroup interface
> 
>
> Key: FLINK-4210
> URL: https://issues.apache.org/jira/browse/FLINK-4210
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The (user-facing) MetricGroup interface currently exposes a close() and 
> isClosed() method which generally users shouldn't need to call. They are an 
> internal thing, and thus should be moved into the AbstractMetricGroup class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3466) Job might get stuck in restoreState() from HDFS due to interrupt

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378350#comment-15378350
 ] 

ASF GitHub Bot commented on FLINK-3466:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2252

[FLINK-3466] [runtime] Cancel state handled on state restore

This pull request fixes the issue that state restore operations can get 
stuck when tasks are cancelled during state restore. That happens due to a bug 
in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted.

This introduces two things:

  1. All state handles and key/value snapshots are now `Closable`. This 
does not delete any checkpoint data, but simply closes pending streams and data 
fetch handles. Operations concurrently accessing the state handles state should 
fail.

  2. The `StreamTask` holds a set of "Closables" that it closes upon 
cancellation. This is a cleaner way of stopping in-progress work than relying 
on "interrupt()" to interrupt that work.

This mechanism should eventually be extended to also cancel operators and 
state handles pending asynchronous materialization.

There is a test that has an interrupt sensitive state handle (mimicking 
HDFS's deadlock behavior) that causes a stall without this pull request and 
cleanly finishes with the changes in this pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
state_handle_cancellation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2252


commit 224503b86c2864f604a7c519ea5f415c57f35ff3
Author: Stephan Ewen 
Date:   2016-07-14T13:14:12Z

[FLINK-3466] [tests] Add serialization validation for state handles

commit c411b379381ab1390e2166356232a33165c1abd9
Author: Stephan Ewen 
Date:   2016-07-13T19:32:40Z

[FLINK-3466] [runtime] Make state handles cancelable.

State handles are cancelable, to make sure long running checkpoint restore 
operations do
finish early on cancallation, even if the code does not properly react to 
interrupts.

This is especially important since HDFS client code is so buggy that it 
deadlocks when
interrupted without closing.




> Job might get stuck in restoreState() from HDFS due to interrupt
> 
>
> Key: FLINK-3466
> URL: https://issues.apache.org/jira/browse/FLINK-3466
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.0, 0.10.2
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>
> A user reported the following issue with a failing job:
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task  
>- Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> 

[GitHub] flink pull request #2252: [FLINK-3466] [runtime] Cancel state handled on sta...

2016-07-14 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2252

[FLINK-3466] [runtime] Cancel state handled on state restore

This pull request fixes the issue that state restore operations can get 
stuck when tasks are cancelled during state restore. That happens due to a bug 
in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted.

This introduces two things:

  1. All state handles and key/value snapshots are now `Closable`. This 
does not delete any checkpoint data, but simply closes pending streams and data 
fetch handles. Operations concurrently accessing the state handles state should 
fail.

  2. The `StreamTask` holds a set of "Closables" that it closes upon 
cancellation. This is a cleaner way of stopping in-progress work than relying 
on "interrupt()" to interrupt that work.

This mechanism should eventually be extended to also cancel operators and 
state handles pending asynchronous materialization.

There is a test that has an interrupt sensitive state handle (mimicking 
HDFS's deadlock behavior) that causes a stall without this pull request and 
cleanly finishes with the changes in this pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
state_handle_cancellation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2252


commit 224503b86c2864f604a7c519ea5f415c57f35ff3
Author: Stephan Ewen 
Date:   2016-07-14T13:14:12Z

[FLINK-3466] [tests] Add serialization validation for state handles

commit c411b379381ab1390e2166356232a33165c1abd9
Author: Stephan Ewen 
Date:   2016-07-13T19:32:40Z

[FLINK-3466] [runtime] Make state handles cancelable.

State handles are cancelable, to make sure long running checkpoint restore 
operations do
finish early on cancallation, even if the code does not properly react to 
interrupts.

This is especially important since HDFS client code is so buggy that it 
deadlocks when
interrupted without closing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/2477161352e12e75e2f0f85b5833ad04dc6d31f2#commitcomment-18252310
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
 on line 103:
The change I made was to clear the heap references earlier. Less chance of 
redundant work when concurrent disposals happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/2477161352e12e75e2f0f85b5833ad04dc6d31f2#commitcomment-18252297
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
 on line 103:
The pattern is
  - cache the reference on stack (immutable against concurrent 
modifications)
  - set the heap reference to null
  - proceed based on the stack reference

I think that should work. If the heap reference was non null initially 
before, the stack reference is non null, and the condition is true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4214) JobExceptionsHandler will return all exceptions

2016-07-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4214.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed via 91d5c63a717d8786506c2d791bb4683838f699d8

Thank you for the contribution!

> JobExceptionsHandler will return all exceptions
> ---
>
> Key: FLINK-4214
> URL: https://issues.apache.org/jira/browse/FLINK-4214
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sumit Chawla
>Priority: Minor
> Fix For: 1.1.0
>
>
> JobExceptionsHandler will return all exceptions and is not incrementing the 
> integer to track the exceptions being serialized



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4216) WordWithCount example with Java has wrong generics type

2016-07-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4216.
---

> WordWithCount example with Java has wrong generics type
> ---
>
> Key: FLINK-4216
> URL: https://issues.apache.org/jira/browse/FLINK-4216
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Serhiy Boychenko
>Assignee: Matthias J. Sax
>Priority: Trivial
> Fix For: 1.1.0
>
>   Original Estimate: 10m
>  Remaining Estimate: 10m
>
> The Java example of the POJOs results in the:
> {code}
> Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
> {code}
> due to the wrong type of the generics of the DataStream.
> Currently it is {code}DataStream>{code}
> but should be {code}DataSource{code}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4216) WordWithCount example with Java has wrong generics type

2016-07-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4216.
-
   Resolution: Fixed
 Assignee: Matthias J. Sax
Fix Version/s: 1.1.0

Fixed via 2346468446414b2a14c2833be4d60288cd8d0550

> WordWithCount example with Java has wrong generics type
> ---
>
> Key: FLINK-4216
> URL: https://issues.apache.org/jira/browse/FLINK-4216
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Serhiy Boychenko
>Assignee: Matthias J. Sax
>Priority: Trivial
> Fix For: 1.1.0
>
>   Original Estimate: 10m
>  Remaining Estimate: 10m
>
> The Java example of the POJOs results in the:
> {code}
> Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
> {code}
> due to the wrong type of the generics of the DataStream.
> Currently it is {code}DataStream>{code}
> but should be {code}DataSource{code}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2125) String delimiter for SocketTextStream

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378331#comment-15378331
 ] 

ASF GitHub Bot commented on FLINK-2125:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2233
  
@StephanEwen The socketTextStream methods are already marked as 
`@PublicEvolving`, i thought changing those was allowed?


> String delimiter for SocketTextStream
> -
>
> Key: FLINK-2125
> URL: https://issues.apache.org/jira/browse/FLINK-2125
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Márton Balassi
>Priority: Minor
>  Labels: starter
>
> The SocketTextStreamFunction uses a character delimiter, despite other parts 
> of the API using String delimiter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4214) JobExceptionsHandler will return all exceptions

2016-07-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4214.
---

> JobExceptionsHandler will return all exceptions
> ---
>
> Key: FLINK-4214
> URL: https://issues.apache.org/jira/browse/FLINK-4214
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sumit Chawla
>Priority: Minor
> Fix For: 1.1.0
>
>
> JobExceptionsHandler will return all exceptions and is not incrementing the 
> integer to track the exceptions being serialized



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2233: [FLINK-2125][streaming] Delimiter change from char to str...

2016-07-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2233
  
@StephanEwen The socketTextStream methods are already marked as 
`@PublicEvolving`, i thought changing those was allowed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2247: FLINK-4216

2016-07-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2247


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2246: [hotfix] [doc] fixed example

2016-07-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2246


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2242: [FLINK-4214] ExceptionHandler keep count of except...

2016-07-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2242


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4214) JobExceptionsHandler will return all exceptions

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378320#comment-15378320
 ] 

ASF GitHub Bot commented on FLINK-4214:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2242


> JobExceptionsHandler will return all exceptions
> ---
>
> Key: FLINK-4214
> URL: https://issues.apache.org/jira/browse/FLINK-4214
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sumit Chawla
>Priority: Minor
>
> JobExceptionsHandler will return all exceptions and is not incrementing the 
> integer to track the exceptions being serialized



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4216) WordWithCount example with Java has wrong generics type

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378321#comment-15378321
 ] 

ASF GitHub Bot commented on FLINK-4216:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2247


> WordWithCount example with Java has wrong generics type
> ---
>
> Key: FLINK-4216
> URL: https://issues.apache.org/jira/browse/FLINK-4216
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Serhiy Boychenko
>Priority: Trivial
>   Original Estimate: 10m
>  Remaining Estimate: 10m
>
> The Java example of the POJOs results in the:
> {code}
> Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
> {code}
> due to the wrong type of the generics of the DataStream.
> Currently it is {code}DataStream>{code}
> but should be {code}DataSource{code}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378305#comment-15378305
 ] 

ASF GitHub Bot commented on FLINK-4208:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2239#discussion_r70877433
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---
@@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
 fi
 
 case $STARTSTOP in
+(start|start-foreground)
+  # Rotate log files
+  rotateLogFile $log
+  rotateLogFile $out
+
+  # Print a warning if daemons are already running on host
+  if [ -f $pid ]; then
+active=()
+while IFS='' read -r p || [[ -n "$p" ]]; do
+  kill -0 $p >/dev/null 2>&1
+  if [ $? -eq 0 ]; then
+active+=($p)
+  fi
+done < "${pid}"
 
-(start)
-# Rotate log files
-rotateLogFile $log
-rotateLogFile $out
-
-# Print a warning if daemons are already running on host
-if [ -f $pid ]; then
-  active=()
-  while IFS='' read -r p || [[ -n "$p" ]]; do
-kill -0 $p >/dev/null 2>&1
-if [ $? -eq 0 ]; then
-  active+=($p)
-fi
-  done < "${pid}"
-
-  count="${#active[@]}"
+count="${#active[@]}"
 
-  if [ ${count} -gt 0 ]; then
-echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
-  fi
+if [ ${count} -gt 0 ]; then
+  echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
 fi
+  fi
+
+  if [[ $STARTSTOP == "start-foreground" ]]; then
+echo "Starting $DAEMON as a foreground process on host $HOSTNAME."
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null
+  fi
 
+  if [[ $STARTSTOP == "start" ]]; then
 echo "Starting $DAEMON daemon on host $HOSTNAME."
-$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" > "$out" 2>&1 < /dev/null &
--- End diff --

This is excellent documentation. It looks like (from a `grep daemon` 
through the code) that Flink is daemonizing its threads internally. Perhaps 
@StephanEwen can look at this.


> Support Running Flink processes in foreground mode
> --
>
> Key: FLINK-4208
> URL: https://issues.apache.org/jira/browse/FLINK-4208
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Flink clusters are started automatically in daemon mode, this is definitely 
> the default case, however if we want to start containers based on flinks, the 
> execution context gets lost. Running flink as foreground processes can fix 
> this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378301#comment-15378301
 ] 

ASF GitHub Bot commented on FLINK-4208:
---

Github user iemejia commented on the issue:

https://github.com/apache/flink/pull/2239
  
I tried not to change the current daemon behavior, that's the reason why I 
took
the decision to add an additional option.  I am not sure if using wait may 
work for what I want but if it does, perfect, can you give me hints of how to 
test this ? I naively did this and it does not seem to work.

```
if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 
2>&1 ; then
echo $mypid >> $pid
+   wait $mypid # I also tried with $pid and it does not work either
else
```


> Support Running Flink processes in foreground mode
> --
>
> Key: FLINK-4208
> URL: https://issues.apache.org/jira/browse/FLINK-4208
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Flink clusters are started automatically in daemon mode, this is definitely 
> the default case, however if we want to start containers based on flinks, the 
> execution context gets lost. Running flink as foreground processes can fix 
> this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2239: [FLINK-4208] Support Running Flink processes in fo...

2016-07-14 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2239#discussion_r70877433
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---
@@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
 fi
 
 case $STARTSTOP in
+(start|start-foreground)
+  # Rotate log files
+  rotateLogFile $log
+  rotateLogFile $out
+
+  # Print a warning if daemons are already running on host
+  if [ -f $pid ]; then
+active=()
+while IFS='' read -r p || [[ -n "$p" ]]; do
+  kill -0 $p >/dev/null 2>&1
+  if [ $? -eq 0 ]; then
+active+=($p)
+  fi
+done < "${pid}"
 
-(start)
-# Rotate log files
-rotateLogFile $log
-rotateLogFile $out
-
-# Print a warning if daemons are already running on host
-if [ -f $pid ]; then
-  active=()
-  while IFS='' read -r p || [[ -n "$p" ]]; do
-kill -0 $p >/dev/null 2>&1
-if [ $? -eq 0 ]; then
-  active+=($p)
-fi
-  done < "${pid}"
-
-  count="${#active[@]}"
+count="${#active[@]}"
 
-  if [ ${count} -gt 0 ]; then
-echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
-  fi
+if [ ${count} -gt 0 ]; then
+  echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
 fi
+  fi
+
+  if [[ $STARTSTOP == "start-foreground" ]]; then
+echo "Starting $DAEMON as a foreground process on host $HOSTNAME."
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null
+  fi
 
+  if [[ $STARTSTOP == "start" ]]; then
 echo "Starting $DAEMON daemon on host $HOSTNAME."
-$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" > "$out" 2>&1 < /dev/null &
--- End diff --

This is excellent documentation. It looks like (from a `grep daemon` 
through the code) that Flink is daemonizing its threads internally. Perhaps 
@StephanEwen can look at this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2239: [FLINK-4208] Support Running Flink processes in foregroun...

2016-07-14 Thread iemejia
Github user iemejia commented on the issue:

https://github.com/apache/flink/pull/2239
  
I tried not to change the current daemon behavior, that's the reason why I 
took
the decision to add an additional option.  I am not sure if using wait may 
work for what I want but if it does, perfect, can you give me hints of how to 
test this ? I naively did this and it does not seem to work.

```
if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 
2>&1 ; then
echo $mypid >> $pid
+   wait $mypid # I also tried with $pid and it does not work either
else
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70876628
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
---
@@ -285,6 +291,18 @@ public static ZooKeeperCheckpointIDCounter 
createCheckpointIDCounter(
}
}
 
+   private static String generateZookeeperPath(String root, String 
namespace) {
+   if(!namespace.startsWith("/")) {
--- End diff --

missing space after if, and a second time a few lines downwards


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2249#discussion_r70876542
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
---
@@ -164,6 +169,7 @@ public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
 * @param clientThe {@link CuratorFramework} ZooKeeper client 
to use
 * @param configuration {@link Configuration} object containing the 
configuration values
 * @return {@link ZooKeeperLeaderElectionService} instance.
+* @return {@link ZooKeeperLeaderElectionService} instance.
--- End diff --

duplicate `@return` entry


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-07-14 Thread zentol
Github user zentol commented on the pull request:


https://github.com/apache/flink/commit/2477161352e12e75e2f0f85b5833ad04dc6d31f2#commitcomment-18251972
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
 on line 103:
will this condition now ever be true?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"

2016-07-14 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378271#comment-15378271
 ] 

Ivan Mushketyk commented on FLINK-4029:
---

Gabor,

This should be very helpful! Thank you for your suggestion.

> Multi-field "sum" function just like "keyBy"
> 
>
> Key: FLINK-4029
> URL: https://issues.apache.org/jira/browse/FLINK-4029
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Rami
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> I can use keyBy as follows:
> stream.keyBy(“pojo.field1”,”pojo.field2”,…)
> Would make sense that I can use sum for example, to do its job for more than 
> one field:
> stream.sum(“pojo.field1”,”pojo.field2”,…)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"

2016-07-14 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378269#comment-15378269
 ] 

Ivan Mushketyk commented on FLINK-4029:
---

Rami,

Thank you for your reply, it is clear now.

> Multi-field "sum" function just like "keyBy"
> 
>
> Key: FLINK-4029
> URL: https://issues.apache.org/jira/browse/FLINK-4029
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Rami
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> I can use keyBy as follows:
> stream.keyBy(“pojo.field1”,”pojo.field2”,…)
> Would make sense that I can use sum for example, to do its job for more than 
> one field:
> stream.sum(“pojo.field1”,”pojo.field2”,…)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378263#comment-15378263
 ] 

ASF GitHub Bot commented on FLINK-4208:
---

Github user iemejia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2239#discussion_r70873347
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---
@@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
 fi
 
 case $STARTSTOP in
+(start|start-foreground)
+  # Rotate log files
+  rotateLogFile $log
+  rotateLogFile $out
+
+  # Print a warning if daemons are already running on host
+  if [ -f $pid ]; then
+active=()
+while IFS='' read -r p || [[ -n "$p" ]]; do
+  kill -0 $p >/dev/null 2>&1
+  if [ $? -eq 0 ]; then
+active+=($p)
+  fi
+done < "${pid}"
 
-(start)
-# Rotate log files
-rotateLogFile $log
-rotateLogFile $out
-
-# Print a warning if daemons are already running on host
-if [ -f $pid ]; then
-  active=()
-  while IFS='' read -r p || [[ -n "$p" ]]; do
-kill -0 $p >/dev/null 2>&1
-if [ $? -eq 0 ]; then
-  active+=($p)
-fi
-  done < "${pid}"
-
-  count="${#active[@]}"
+count="${#active[@]}"
 
-  if [ ${count} -gt 0 ]; then
-echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
-  fi
+if [ ${count} -gt 0 ]; then
+  echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
 fi
+  fi
+
+  if [[ $STARTSTOP == "start-foreground" ]]; then
+echo "Starting $DAEMON as a foreground process on host $HOSTNAME."
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null
+  fi
 
+  if [[ $STARTSTOP == "start" ]]; then
 echo "Starting $DAEMON daemon on host $HOSTNAME."
-$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" > "$out" 2>&1 < /dev/null &
--- End diff --

I added this because I was trying to grasp what makes a daemon a daemon and 
I found a reference that convinced me that nohup was missing:

https://stackoverflow.com/questions/3430330/best-way-to-make-a-shell-script-daemon

Additionally when I looked for inspiration for my changes (the 
start-foreground name), I look
at how they started the daemon in zookeeper and I noticed they use nohup
too.
https://github.com/apache/zookeeper/blob/trunk/bin/zkServer.sh#L219

This is an extra thing and not the core of the Pull Request, if you don't 
agree
I can rebase and remove that commit, but I think it is worth the addition.



> Support Running Flink processes in foreground mode
> --
>
> Key: FLINK-4208
> URL: https://issues.apache.org/jira/browse/FLINK-4208
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Flink clusters are started automatically in daemon mode, this is definitely 
> the default case, however if we want to start containers based on flinks, the 
> execution context gets lost. Running flink as foreground processes can fix 
> this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2239: [FLINK-4208] Support Running Flink processes in fo...

2016-07-14 Thread iemejia
Github user iemejia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2239#discussion_r70873347
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---
@@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
 fi
 
 case $STARTSTOP in
+(start|start-foreground)
+  # Rotate log files
+  rotateLogFile $log
+  rotateLogFile $out
+
+  # Print a warning if daemons are already running on host
+  if [ -f $pid ]; then
+active=()
+while IFS='' read -r p || [[ -n "$p" ]]; do
+  kill -0 $p >/dev/null 2>&1
+  if [ $? -eq 0 ]; then
+active+=($p)
+  fi
+done < "${pid}"
 
-(start)
-# Rotate log files
-rotateLogFile $log
-rotateLogFile $out
-
-# Print a warning if daemons are already running on host
-if [ -f $pid ]; then
-  active=()
-  while IFS='' read -r p || [[ -n "$p" ]]; do
-kill -0 $p >/dev/null 2>&1
-if [ $? -eq 0 ]; then
-  active+=($p)
-fi
-  done < "${pid}"
-
-  count="${#active[@]}"
+count="${#active[@]}"
 
-  if [ ${count} -gt 0 ]; then
-echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
-  fi
+if [ ${count} -gt 0 ]; then
+  echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
 fi
+  fi
+
+  if [[ $STARTSTOP == "start-foreground" ]]; then
+echo "Starting $DAEMON as a foreground process on host $HOSTNAME."
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null
+  fi
 
+  if [[ $STARTSTOP == "start" ]]; then
 echo "Starting $DAEMON daemon on host $HOSTNAME."
-$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" > "$out" 2>&1 < /dev/null &
--- End diff --

I added this because I was trying to grasp what makes a daemon a daemon and 
I found a reference that convinced me that nohup was missing:

https://stackoverflow.com/questions/3430330/best-way-to-make-a-shell-script-daemon

Additionally when I looked for inspiration for my changes (the 
start-foreground name), I look
at how they started the daemon in zookeeper and I noticed they use nohup
too.
https://github.com/apache/zookeeper/blob/trunk/bin/zkServer.sh#L219

This is an extra thing and not the core of the Pull Request, if you don't 
agree
I can rebase and remove that commit, but I think it is worth the addition.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-07-14 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-3874:
-

Assignee: Ivan Mushketyk

> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-07-14 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
I've update the PR according to the PR and fixed the build (I was using a 
method from JDK8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4212) Lock PID file when starting daemons

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378248#comment-15378248
 ] 

ASF GitHub Bot commented on FLINK-4212:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2251

[FLINK-4212] [scripts] Lock PID file when starting daemons



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4212_lock_pid_file_when_starting_daemons

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2251


commit e7d59835ab011e38de61a0b304cd93f1ca7cefb7
Author: Greg Hogan 
Date:   2016-07-14T15:55:41Z

[FLINK-4212] [scripts] Lock PID file when starting daemons




> Lock PID file when starting daemons
> ---
>
> Key: FLINK-4212
> URL: https://issues.apache.org/jira/browse/FLINK-4212
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> As noted on the mailing list (0), when multiple TaskManagers are started in 
> parallel (using pdsh) there is a race condition on updating the pid: 1) the 
> pid file is first read to parse the process' index, 2) the process is 
> started, and 3) on success the daemon pid is appended to the pid file.
> We could use a tool such as {{flock}} to lock on the pid file while starting 
> the Flink daemon.
> 0: 
> http://mail-archives.apache.org/mod_mbox/flink-user/201607.mbox/%3CCA%2BssbKXw954Bz_sBRwP6db0FntWyGWzTyP7wJZ5nhOeQnof3kg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2251: [FLINK-4212] [scripts] Lock PID file when starting...

2016-07-14 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2251

[FLINK-4212] [scripts] Lock PID file when starting daemons



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4212_lock_pid_file_when_starting_daemons

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2251


commit e7d59835ab011e38de61a0b304cd93f1ca7cefb7
Author: Greg Hogan 
Date:   2016-07-14T15:55:41Z

[FLINK-4212] [scripts] Lock PID file when starting daemons




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378227#comment-15378227
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
I've update the PR according to the PR and fixed the build (I was using a 
method from JDK8).


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378225#comment-15378225
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2244#discussion_r70870120
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import 
org.apache.flink.streaming.util.serialization.AvroSerializationSchema;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+import static javafx.scene.input.KeyCode.T;
+import static org.junit.Assert.assertEquals;
+
+public class AvroSerializationSchemaTest {
+// @Test
--- End diff --

Sorry, I accidentally added this file to this PR.
I've already removed it.


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378226#comment-15378226
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2244#discussion_r70870183
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import 
org.apache.flink.streaming.util.serialization.AvroSerializationSchema;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+import static javafx.scene.input.KeyCode.T;
--- End diff --

I've already removed this file.


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...

2016-07-14 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2244#discussion_r70870120
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import 
org.apache.flink.streaming.util.serialization.AvroSerializationSchema;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+import static javafx.scene.input.KeyCode.T;
+import static org.junit.Assert.assertEquals;
+
+public class AvroSerializationSchemaTest {
+// @Test
--- End diff --

Sorry, I accidentally added this file to this PR.
I've already removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2125) String delimiter for SocketTextStream

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378219#comment-15378219
 ] 

ASF GitHub Bot commented on FLINK-2125:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2233
  
Because this breaks the public API, it would be good to do the following:

Add a new method to the `StreamExecutionEnvironment`, rather than changing 
the old method. Tag that new method as `@PublicEvolving`.

Take the old method, delegate to the new method, and mark it as 
`@Deprecated`. Also add a proper deprecation comment.


> String delimiter for SocketTextStream
> -
>
> Key: FLINK-2125
> URL: https://issues.apache.org/jira/browse/FLINK-2125
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Márton Balassi
>Priority: Minor
>  Labels: starter
>
> The SocketTextStreamFunction uses a character delimiter, despite other parts 
> of the API using String delimiter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2233: [FLINK-2125][streaming] Delimiter change from char to str...

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2233
  
Because this breaks the public API, it would be good to do the following:

Add a new method to the `StreamExecutionEnvironment`, rather than changing 
the old method. Tag that new method as `@PublicEvolving`.

Take the old method, delegate to the new method, and mark it as 
`@Deprecated`. Also add a proper deprecation comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3666) Remove Nephele references

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378214#comment-15378214
 ] 

ASF GitHub Bot commented on FLINK-3666:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2241
  
Good catch!

+1 to merge


> Remove Nephele references
> -
>
> Key: FLINK-3666
> URL: https://issues.apache.org/jira/browse/FLINK-3666
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
>
> There still exist a few references to nephele which should be removed:
> {code}
> flink\docs\setup\local_setup.md:
>79  $ tail log/flink-*-jobmanager-*.log
>80  INFO ... - Initializing memory manager with 409 megabytes of memory
>81: INFO ... - Trying to load 
> org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler as scheduler
>82  INFO ... - Setting up web info server, using web-root directory ...
>83: INFO ... - Web info server will display information about nephele 
> job-manager on localhost, port 8081.
>84  INFO ... - Starting web info server for JobManager on port 8081
>85  ~~~
>..
>   118  $ cd flink
>   119  $ bin/start-local.sh
>   120: Starting Nephele job manager
>   121  ~~~
> {code}
> {code}
> flink\flink-runtime\src\main\java\org\apache\flink\runtime\operators\TaskContext.java:
>70:AbstractInvokable getOwningNepheleTask();
> {code}
> {code}
> flink\flink-runtime\src\main\java\org\apache\flink\runtime\operators\BatchTask.java:
>  1149  * @param message The main message for the log.
>  1150  * @param taskName The name of the task.
>  1151: * @param parent The nephele task that contains the code 
> producing the message.
>  1152  *
>  1153  * @return The string for logging.
>  
>  1254  */
>  1255 @SuppressWarnings("unchecked")
>  1256:public static  Collector initOutputs(AbstractInvokable 
> nepheleTask, ClassLoader cl, TaskConfig config,
>  1257 
> List chainedTasksTarget,
>  1258 
> List eventualOutputs,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2241: [FLINK-3666] Remove all remaining Nephele references

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2241
  
Good catch!

+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2246: [hotfix] [doc] fixed example

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2246
  
Thanks, merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3163) Configure Flink for NUMA systems

2016-07-14 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378192#comment-15378192
 ] 

Greg Hogan commented on FLINK-3163:
---

I think we can achieve "good enough" without changing the format of {{masters}} 
and {{slaves}}. Mesos and YARN provide cluster management, and it might be best 
to keep the Flink configuration simple.

What if we added
* a configuration parameter to enable NUMA which would result in a TaskManager 
started on each NUMA node for each IP in {{slaves}}
* a configuration parameter (one or two?) for the JobManager and 
ResourceManager to run in their own NUMA node, not shared with a TaskManager 
(would the JM and RM share a NUMA node if on the same IP?)

These could be {{taskmanager.compute.numa}}, {{jobmanager.compute.numa}}, and 
{{resourcemanager.compute.numa}}.

We could also add, as a related idea, {{taskmanager.compute.fraction}}. This 
would operate relative to {{taskmanager.numberOfTaskSlots}} as 
{{taskmanager.memory.fraction}} operates relative to 
{{taskmanager.memory.size}}. If set to {{1.0}} you would get one slot per 
(hyper-threaded) processor.

As [~saliya] noted, binding processes is quite easy. Since I have only dealt 
with single-socket systems I have temporarily hard-coded the following in my 
build:

{code}
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e579c0c..5f076d5 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then
 args=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
-"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+else
+numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}"/flink-daemon.sh 
$STARTSTOP taskmanager "${args[@]}"
+numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}"/flink-daemon.sh 
$STARTSTOP taskmanager "${args[@]}"
+fi
{code}

> Configure Flink for NUMA systems
> 
>
> Key: FLINK-3163
> URL: https://issues.apache.org/jira/browse/FLINK-3163
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> On NUMA systems Flink can be pinned to a single physical processor ("node") 
> using {{numactl --membind=$node --cpunodebind=$node }}. Commonly 
> available NUMA systems include the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could 
> configure a single TaskManager with 36 slots or have Flink create two 
> TaskManagers bound to each of the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between 
> TaskManagers on the same system, though the fraction of data shuffled in this 
> manner decreases with the size of the cluster. The performance improvement 
> from only accessing local memory looks to be significant though difficult to 
> benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4210) Move close()/isClosed() out of MetricGroup interface

2016-07-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378189#comment-15378189
 ] 

Stephan Ewen commented on FLINK-4210:
-

I don't think that is too bad. Worst thing that can happen is that a user 
closes the user-code-metricgroup.

> Move close()/isClosed() out of MetricGroup interface
> 
>
> Key: FLINK-4210
> URL: https://issues.apache.org/jira/browse/FLINK-4210
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The (user-facing) MetricGroup interface currently exposes a close() and 
> isClosed() method which generally users shouldn't need to call. They are an 
> internal thing, and thus should be moved into the AbstractMetricGroup class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting

2016-07-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378170#comment-15378170
 ] 

Stephan Ewen commented on FLINK-4218:
-

If I understand correctly, it is actually possible that some nodes see the 
files, and some do not?

We can try different options:

  - When a file state handle is closed, the file system is queried for the 
existence of that file. Only after that is there, the checkpoint is 
acknowledged. There needs to be a somewhat tight timeout on that.
  - On restore, we can have some re-tries, with a reasonable timeout.

What do you think about these options?

> Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." 
> causes task restarting
> --
>
> Key: FLINK-4218
> URL: https://issues.apache.org/jira/browse/FLINK-4218
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.1.0
>Reporter: Sergii Koshel
>
> Sporadically see exception as below. And restart of task because of it.
> {code:title=Exception|borderStyle=solid}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779)
>   ... 8 more
> {code}
> File actually exists on S3. 
> I suppose it is related to some race conditions with S3 but would be good to 
> retry a few times before stop task execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4214) JobExceptionsHandler will return all exceptions

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378166#comment-15378166
 ] 

ASF GitHub Bot commented on FLINK-4214:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2242
  
Merging this, thanks for the patch!


> JobExceptionsHandler will return all exceptions
> ---
>
> Key: FLINK-4214
> URL: https://issues.apache.org/jira/browse/FLINK-4214
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sumit Chawla
>Priority: Minor
>
> JobExceptionsHandler will return all exceptions and is not incrementing the 
> integer to track the exceptions being serialized



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2242: [FLINK-4214] ExceptionHandler keep count of exceptions

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2242
  
Merging this, thanks for the patch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2248: [FLINK-4213] [gelly] Provide CombineHint in Gelly algorit...

2016-07-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2248
  
Looks good to me.
How much faster does it get with the hash combiner?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4213) Provide CombineHint in Gelly algorithms

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378165#comment-15378165
 ] 

ASF GitHub Bot commented on FLINK-4213:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2248
  
Looks good to me.
How much faster does it get with the hash combiner?


> Provide CombineHint in Gelly algorithms
> ---
>
> Key: FLINK-4213
> URL: https://issues.apache.org/jira/browse/FLINK-4213
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Many graph algorithms will see better {{reduce}} performance with the 
> hash-combine compared with the still default sort-combine, e.g. HITS and 
> LocalClusteringCoefficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4217) Gelly drivers should read CSV values as strings

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378145#comment-15378145
 ] 

ASF GitHub Bot commented on FLINK-4217:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2250

[FLINK-4217] [gelly] Gelly drivers should read CSV values as strings



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4217_gelly_drivers_should_read_csv_values_as_strings

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2250


commit f7674ee7126d20f8627c4007592c41b7f8a2bc39
Author: Greg Hogan 
Date:   2016-07-14T14:02:23Z

[FLINK-4217] [gelly] Gelly drivers should read CSV values as strings

The user must now select "integer" or "string" when reading a graph from
a CSV file.




> Gelly drivers should read CSV values as strings
> ---
>
> Key: FLINK-4217
> URL: https://issues.apache.org/jira/browse/FLINK-4217
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Gelly drivers ClusteringCoefficient, HITS, JaccardIndex, and TriangleListing 
> parse CSV files as {{LongValue}}. This works for anonymized data sets such as 
> SNAP but should be configurable as {{StringValue}} to handle the general case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2250: [FLINK-4217] [gelly] Gelly drivers should read CSV...

2016-07-14 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2250

[FLINK-4217] [gelly] Gelly drivers should read CSV values as strings



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4217_gelly_drivers_should_read_csv_values_as_strings

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2250


commit f7674ee7126d20f8627c4007592c41b7f8a2bc39
Author: Greg Hogan 
Date:   2016-07-14T14:02:23Z

[FLINK-4217] [gelly] Gelly drivers should read CSV values as strings

The user must now select "integer" or "string" when reading a graph from
a CSV file.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3630) Little mistake in documentation

2016-07-14 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan reassigned FLINK-3630:
-

Assignee: Greg Hogan  (was: Riccardo Diomedi)

> Little mistake in documentation
> ---
>
> Key: FLINK-3630
> URL: https://issues.apache.org/jira/browse/FLINK-3630
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Documentation
>Affects Versions: 1.0.0
>Reporter: Riccardo Diomedi
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: documentation
>
> in section "GroupCombine on a Grouped DataSet" of the following link: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#groupreduce-on-grouped-dataset
> there is a little mistake in java code in both combine and reduce method(it's 
> the same mistake). The variable "word" is defined in the scope of the for 
> loop so it cannot be used in collect method.
> Possible solution could be to initialise the variable before the for and 
> assign a value inside the for.
> Something like:
> int count = 0;
> String word;
> for (String record : words) {
> word = record;
> count++;
> }
> out.collect(new Tuple2(word, count)); 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4219) Quote PDSH opts in start-cluster.sh

2016-07-14 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4219:
-

 Summary: Quote PDSH opts in start-cluster.sh
 Key: FLINK-4219
 URL: https://issues.apache.org/jira/browse/FLINK-4219
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Affects Versions: 1.1.0
Reporter: Greg Hogan
Assignee: Greg Hogan
 Fix For: 1.1.0


Quote {{PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS}} in {{start-cluster.sh}} to 
prevent word splitting if the user configures multiple SSH options.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2239: [FLINK-4208] Support Running Flink processes in foregroun...

2016-07-14 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2239
  
What if instead of changing how we start the daemon (so continue to always 
start as a background process), we instead add a `wait` after the PID file has 
been updated when starting a foreground process?

If FLINK-4212 is accepted we would also need to release the file lock 
before calling `wait`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2220: [FLINK-4184] [metrics] Replace invalid characters ...

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2220#discussion_r70845241
  
--- Diff: 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 ---
@@ -74,6 +75,15 @@ protected ScheduledDropwizardReporter() {
}
 
// 

+   //  Getters
+   // 

+
+   // used for testing purposes
+   Map getCounters() {
--- End diff --

could we move this into the TestingScheduledDropwizardReporter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377314#comment-15377314
 ] 

ASF GitHub Bot commented on FLINK-4184:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2220
  
only one comment left, otherwise +1


> Ganglia and GraphiteReporter report metric names with invalid characters
> 
>
> Key: FLINK-4184
> URL: https://issues.apache.org/jira/browse/FLINK-4184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with 
> names which contain invalid characters. For example, quotes are not filtered 
> out which can be problematic for Ganglia. Moreover, dots are not replaced 
> which causes Graphite to think that an IP address is actually a scoped metric 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2220: [FLINK-4184] [metrics] Replace invalid characters in Sche...

2016-07-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2220
  
only one comment left, otherwise +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377304#comment-15377304
 ] 

ASF GitHub Bot commented on FLINK-4184:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2220#discussion_r70845241
  
--- Diff: 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 ---
@@ -74,6 +75,15 @@ protected ScheduledDropwizardReporter() {
}
 
// 

+   //  Getters
+   // 

+
+   // used for testing purposes
+   Map getCounters() {
--- End diff --

could we move this into the TestingScheduledDropwizardReporter?


> Ganglia and GraphiteReporter report metric names with invalid characters
> 
>
> Key: FLINK-4184
> URL: https://issues.apache.org/jira/browse/FLINK-4184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with 
> names which contain invalid characters. For example, quotes are not filtered 
> out which can be problematic for Ganglia. Moreover, dots are not replaced 
> which causes Graphite to think that an IP address is actually a scoped metric 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377297#comment-15377297
 ] 

ASF GitHub Bot commented on FLINK-4208:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2239#discussion_r70844709
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---
@@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
 fi
 
 case $STARTSTOP in
+(start|start-foreground)
+  # Rotate log files
+  rotateLogFile $log
+  rotateLogFile $out
+
+  # Print a warning if daemons are already running on host
+  if [ -f $pid ]; then
+active=()
+while IFS='' read -r p || [[ -n "$p" ]]; do
+  kill -0 $p >/dev/null 2>&1
+  if [ $? -eq 0 ]; then
+active+=($p)
+  fi
+done < "${pid}"
 
-(start)
-# Rotate log files
-rotateLogFile $log
-rotateLogFile $out
-
-# Print a warning if daemons are already running on host
-if [ -f $pid ]; then
-  active=()
-  while IFS='' read -r p || [[ -n "$p" ]]; do
-kill -0 $p >/dev/null 2>&1
-if [ $? -eq 0 ]; then
-  active+=($p)
-fi
-  done < "${pid}"
-
-  count="${#active[@]}"
+count="${#active[@]}"
 
-  if [ ${count} -gt 0 ]; then
-echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
-  fi
+if [ ${count} -gt 0 ]; then
+  echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
 fi
+  fi
+
+  if [[ $STARTSTOP == "start-foreground" ]]; then
+echo "Starting $DAEMON as a foreground process on host $HOSTNAME."
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null
+  fi
 
+  if [[ $STARTSTOP == "start" ]]; then
 echo "Starting $DAEMON daemon on host $HOSTNAME."
-$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" > "$out" 2>&1 < /dev/null &
--- End diff --

Under what circumstances is the `nohup` necessary?


> Support Running Flink processes in foreground mode
> --
>
> Key: FLINK-4208
> URL: https://issues.apache.org/jira/browse/FLINK-4208
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Flink clusters are started automatically in daemon mode, this is definitely 
> the default case, however if we want to start containers based on flinks, the 
> execution context gets lost. Running flink as foreground processes can fix 
> this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2239: [FLINK-4208] Support Running Flink processes in fo...

2016-07-14 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2239#discussion_r70844709
  
--- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ---
@@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
 fi
 
 case $STARTSTOP in
+(start|start-foreground)
+  # Rotate log files
+  rotateLogFile $log
+  rotateLogFile $out
+
+  # Print a warning if daemons are already running on host
+  if [ -f $pid ]; then
+active=()
+while IFS='' read -r p || [[ -n "$p" ]]; do
+  kill -0 $p >/dev/null 2>&1
+  if [ $? -eq 0 ]; then
+active+=($p)
+  fi
+done < "${pid}"
 
-(start)
-# Rotate log files
-rotateLogFile $log
-rotateLogFile $out
-
-# Print a warning if daemons are already running on host
-if [ -f $pid ]; then
-  active=()
-  while IFS='' read -r p || [[ -n "$p" ]]; do
-kill -0 $p >/dev/null 2>&1
-if [ $? -eq 0 ]; then
-  active+=($p)
-fi
-  done < "${pid}"
-
-  count="${#active[@]}"
+count="${#active[@]}"
 
-  if [ ${count} -gt 0 ]; then
-echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
-  fi
+if [ ${count} -gt 0 ]; then
+  echo "[INFO] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
 fi
+  fi
+
+  if [[ $STARTSTOP == "start-foreground" ]]; then
+echo "Starting $DAEMON as a foreground process on host $HOSTNAME."
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null
+  fi
 
+  if [[ $STARTSTOP == "start" ]]; then
 echo "Starting $DAEMON daemon on host $HOSTNAME."
-$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
+nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" > "$out" 2>&1 < /dev/null &
--- End diff --

Under what circumstances is the `nohup` necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377290#comment-15377290
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2236
  
couldn't find any problem, +1 from my side.


> Expose Kafka metrics through Flink metrics
> --
>
> Key: FLINK-4186
> URL: https://issues.apache.org/jira/browse/FLINK-4186
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> Currently, we expose the Kafka metrics through Flink's accumulators.
> We can now use the metrics system in Flink to report Kafka metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2236: [FLINK-4186] Use Flink metrics to report Kafka metrics

2016-07-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2236
  
couldn't find any problem, +1 from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377282#comment-15377282
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70843655
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,127 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception 
{// no op
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+   error.f0 = t;
+   }
+   }
+   

[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70843655
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,127 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception 
{// no op
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+   error.f0 = t;
+   }
+   }
+   }
+   };
+   Thread jobThread = new Thread(job);
+   jobThread.start();
+
+   try {
+   // connect to JMX
+   MBeanServer 

[GitHub] flink pull request #2249: 4166 zookeeper namespaces

2016-07-14 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2249

4166 zookeeper namespaces

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 4166-zookeeper-namespaces

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2249.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2249


commit 6e418c9ee6004cd13d4bfde158ff0b56cb0136aa
Author: Stefan Richter 
Date:   2016-07-14T16:21:40Z

[FLINK-4166] [Distributed Coordination] zookeeper namespaces (cli parameter 
-z)

commit f343da4042a02aa86b5bde81f37d13200a081b41
Author: Stefan Richter 
Date:   2016-07-14T17:01:15Z

[FLINK-4166] [Distributed Coordination] doku: zookeeper namespaces (cli 
parameter -z)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3985) A metric with the name * was already registered

2016-07-14 Thread Joshua Griffith (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377240#comment-15377240
 ] 

Joshua Griffith commented on FLINK-3985:


I'm also seeing around 3000 lines of this error repeatedly outputted to the log 
when running Flink 1.1-SNAPSHOT programs locally. Is there a way to turn off 
JMX reporting?

> A metric with the name * was already registered
> ---
>
> Key: FLINK-3985
> URL: https://issues.apache.org/jira/browse/FLINK-3985
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>  Labels: test-stability
>
> The YARN tests detected the following failure while running WordCount.
> {code}
> 2016-05-27 21:50:48,230 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Received task CHAIN DataSource (at main(WordCount.java:70) 
> (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:80)) -> Combine(SUM(1), at main(WordCount.java:83) (1/2)
> 2016-05-27 21:50:48,231 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1093)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:442)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:284)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at 

[GitHub] flink pull request #2248: [FLINK-4213] [gelly] Provide CombineHint in Gelly ...

2016-07-14 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2248

[FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms

`VertexDegrees` has changed from using a `ReduceFunction` to using a 
`GroupReduceFunction` as in directed `TriangleListing`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4213_provide_combinehint_in_gelly_algorithms

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2248


commit 2a8e5c708acefa881223bb3c8a6371bd5bb6ea9b
Author: Greg Hogan 
Date:   2016-07-14T13:39:02Z

[FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4213) Provide CombineHint in Gelly algorithms

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377224#comment-15377224
 ] 

ASF GitHub Bot commented on FLINK-4213:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2248

[FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms

`VertexDegrees` has changed from using a `ReduceFunction` to using a 
`GroupReduceFunction` as in directed `TriangleListing`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4213_provide_combinehint_in_gelly_algorithms

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2248


commit 2a8e5c708acefa881223bb3c8a6371bd5bb6ea9b
Author: Greg Hogan 
Date:   2016-07-14T13:39:02Z

[FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms




> Provide CombineHint in Gelly algorithms
> ---
>
> Key: FLINK-4213
> URL: https://issues.apache.org/jira/browse/FLINK-4213
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Many graph algorithms will see better {{reduce}} performance with the 
> hash-combine compared with the still default sort-combine, e.g. HITS and 
> LocalClusteringCoefficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting

2016-07-14 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377211#comment-15377211
 ] 

Robert Metzger commented on FLINK-4218:
---

I think this an artifact of S3's consistency model. Enabling EMRFS on EMR will 
probably resolve this issue: 
https://www.infoq.com/news/2015/01/emrfs-s3-consistency

> Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." 
> causes task restarting
> --
>
> Key: FLINK-4218
> URL: https://issues.apache.org/jira/browse/FLINK-4218
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.1.0
>Reporter: Sergii Koshel
>
> Sporadically see exception as below. And restart of task because of it.
> {code:title=Exception|borderStyle=solid}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779)
>   ... 8 more
> {code}
> File actually exists on S3. 
> I suppose it is related to some race conditions with S3 but would be good to 
> retry a few times before stop task execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4212) Lock PID file when starting daemons

2016-07-14 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-4212:
--
Summary: Lock PID file when starting daemons  (was: Lock on pid file when 
starting daemons)

> Lock PID file when starting daemons
> ---
>
> Key: FLINK-4212
> URL: https://issues.apache.org/jira/browse/FLINK-4212
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> As noted on the mailing list (0), when multiple TaskManagers are started in 
> parallel (using pdsh) there is a race condition on updating the pid: 1) the 
> pid file is first read to parse the process' index, 2) the process is 
> started, and 3) on success the daemon pid is appended to the pid file.
> We could use a tool such as {{flock}} to lock on the pid file while starting 
> the Flink daemon.
> 0: 
> http://mail-archives.apache.org/mod_mbox/flink-user/201607.mbox/%3CCA%2BssbKXw954Bz_sBRwP6db0FntWyGWzTyP7wJZ5nhOeQnof3kg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377134#comment-15377134
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2236
  
I fixed the failing build and addressed all comments so far ;) (I was 
working on the code while you've reviewed it )


> Expose Kafka metrics through Flink metrics
> --
>
> Key: FLINK-4186
> URL: https://issues.apache.org/jira/browse/FLINK-4186
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> Currently, we expose the Kafka metrics through Flink's accumulators.
> We can now use the metrics system in Flink to report Kafka metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4216) WordWithCount example with Java has wrong generics type

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377116#comment-15377116
 ] 

ASF GitHub Bot commented on FLINK-4216:
---

Github user serhiy commented on the issue:

https://github.com/apache/flink/pull/2247
  
Damn I was a little bit late :D! You guys are fast :D!


> WordWithCount example with Java has wrong generics type
> ---
>
> Key: FLINK-4216
> URL: https://issues.apache.org/jira/browse/FLINK-4216
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Serhiy Boychenko
>Priority: Trivial
>   Original Estimate: 10m
>  Remaining Estimate: 10m
>
> The Java example of the POJOs results in the:
> {code}
> Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
> {code}
> due to the wrong type of the generics of the DataStream.
> Currently it is {code}DataStream>{code}
> but should be {code}DataSource{code}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2236: [FLINK-4186] Use Flink metrics to report Kafka metrics

2016-07-14 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2236
  
I fixed the failing build and addressed all comments so far ;) (I was 
working on the code while you've reviewed it )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377131#comment-15377131
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70828051
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+

[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377132#comment-15377132
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70828069
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+

[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70828017
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+   error.f0 = t;
+   }
+   }
+   }
+   };
+   Thread jobThread = new Thread(job);
+

[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70828051
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+   error.f0 = t;
+   }
+   }
+   }
+   };
+   Thread jobThread = new Thread(job);
+

[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377130#comment-15377130
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70828017
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+

[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics

2016-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377128#comment-15377128
 ] 

ASF GitHub Bot commented on FLINK-4186:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70827999
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+

[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...

2016-07-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2236#discussion_r70828069
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2 
value, Collector out) throws
 
JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
-   Map accuResults = 
result.getAllAccumulatorResults();
-   // kafka 0.9 consumer: 39 results
-   if (kafkaServer.getVersion().equals("0.9")) {
-   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Test metrics reporting for consumer
+*
+* @throws Exception
+*/
+   public void runMetricsTest() throws Throwable {
+
+   // create a stream with 5 topics
+   final String topic = "metricsStream";
+   createTestTopic(topic, 5, 1);
+
+   final Tuple1 error = new Tuple1<>(null);
+   Runnable job = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // start job writing & reading data.
+   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env1.setParallelism(1);
+   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env1.getConfig().disableSysoutLogging();
+   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
+
+   
TypeInformationSerializationSchema> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.>parse("Tuple2"), env1.getConfig());
+   DataStream> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+   fromKafka.flatMap(new 
FlatMapFunction, Void>() {
+   @Override
+   public void 
flatMap(Tuple2 value, Collector out) throws Exception {
+   // read slowly
+   Thread.sleep(100);
+   }
+   });
+
+   DataStream> 
fromGen = env1.addSource(new RichSourceFunction>() {
+   boolean running = true;
+
+   @Override
+   public void 
run(SourceContext> ctx) throws Exception {
+   int i = 0;
+   while (running) {
+   
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+   Thread.sleep(1);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+   env1.execute("Metrics test job");
+   } catch(Throwable t) {
+   LOG.warn("Got exception during 
execution", t);
+   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
+   error.f0 = t;
+   }
+   }
+   }
+   };
+   Thread jobThread = new Thread(job);
+

  1   2   >