[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699254#comment-14699254
 ] 

Sachin Goel commented on FLINK-2448:


I'm not sure what you mean by running multiple programs in a single method. 
Do you mean, for example, running several jobs in one test? In that case too, 
as long as the environment is re-created, it should not fail.

 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699268#comment-14699268
 ] 

Aljoscha Krettek commented on FLINK-2530:
-

Can this issue be closed then?

 optimize equal() of AcknowledgeCheckpoint
 -

 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2531) combining the if branch to improve the performance

2015-08-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699270#comment-14699270
 ] 

Aljoscha Krettek commented on FLINK-2531:
-

Can this be closed again then?

 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699283#comment-14699283
 ] 

ASF GitHub Bot commented on FLINK-2527:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1027#issuecomment-131753704
  
Looks good, will merge this...


 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699295#comment-14699295
 ] 

Sachin Goel commented on FLINK-2448:


Aha. It makes sense. The `TestEnvironment` context factory returns the same 
object always. The second `getEnvironment` call doesn't actually do anything.

 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37174502
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

I would like to address that in a followup, as this creates conflicts with 
#988 otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2015-08-17 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699336#comment-14699336
 ] 

Márton Balassi commented on FLINK-2491:
---

That should be good enough. I was testing with a weaker version of that, having 
0 buffertimeout. With this setup your test passed most of the time. Can you do 
the fix or would you like me to do it?

 Operators are not participating in state checkpointing in some cases
 

 Key: FLINK-2491
 URL: https://issues.apache.org/jira/browse/FLINK-2491
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 While implementing a test case for the Kafka Consumer, I came across the 
 following bug:
 Consider the following topology, with the operator parallelism in parentheses:
 Source (2) -- Sink (1).
 In this setup, the {{snapshotState()}} method is called on the source, but 
 not on the Sink.
 The sink receives the generated data.
 The only one of the two sources is generating data.
 I've implemented a test case for this, you can find it here: 
 https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699335#comment-14699335
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37174502
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

I would like to address that in a followup, as this creates conflicts with 
#988 otherwise.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...

2015-08-17 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131772983
  
Ah yes. You're right. Lemme see if I can write a workaround to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699353#comment-14699353
 ] 

ASF GitHub Bot commented on FLINK-2448:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131772983
  
Ah yes. You're right. Lemme see if I can write a workaround to this.


 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699242#comment-14699242
 ] 

Aljoscha Krettek commented on FLINK-2514:
-

I have a fix for this as part of FLINK-2398. There I always clean the 
sinks/operators when executing.

 Local and Remote environment behave differently when re-triggering execution.
 -

 Key: FLINK-2514
 URL: https://issues.apache.org/jira/browse/FLINK-2514
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 The following code behaves differently on the {{LocalStreamEnvironment}} and 
 the {{RemoteStreamEnvironment}}.
 {code}
 StreamExecutionEnvironment env = ...;
 env.addSource(someSource).addSink(someSink);
 env.execute();
 env.addSource(anotherSource).addSink(anotherSink);
 env.execute();
 {code}
 Locally, only the second source/sink pair is executed.
 Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-131767636
  
Looks mostly good.

The `TaskRuntimeInfo` is now a bit strange. It contains some runtime info 
for tasks (attempt number) but not all (subtasks, etc). The attempt number is 
in the `TaskDeploymentDescriptor` anyways, so why copy it in addition to the 
`TaskRuntimeInfo`? Before, the `TaskManagerInfo` was clearly the context info 
of the TaskManager that was the same for all tasks and there was no duplicate 
information.

Looks like the motivation was to minimize the number of objects passed to 
the `RuntimeContext`. In that case, why not create a `RuntimeInfo` for the task 
(keep the `TaskManagerInfo`), put all the task-specific information in there, 
pass it to the `RuntimeEnvironment` and `RuntimeContext` and let them return 
all info like `getTaskName` and `getIndexOfThisSubtask` from there?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699349#comment-14699349
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-131771732
  
Yes, let's have a `TaskRuntimeInfo` and a `TaskManagerContext`. Both are 
available in the `RuntimeEnvironment` and passed to the `RuntimeContext`.

The `TaskRuntimeInfo` has all task-specific information, the 
`TaskManagerContext` all cross-task constant parts. It could also hold the I/O 
manager, memory manager, ...


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-131771732
  
Yes, let's have a `TaskRuntimeInfo` and a `TaskManagerContext`. Both are 
available in the `RuntimeEnvironment` and passed to the `RuntimeContext`.

The `TaskRuntimeInfo` has all task-specific information, the 
`TaskManagerContext` all cross-task constant parts. It could also hold the I/O 
manager, memory manager, ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131772364
  
Allright, if there are no further comments, I'll merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699437#comment-14699437
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131796852
  
Hi @ankurcha,
I've started a Mesos cluster on Google Compute Engine to try out your pull 
request.

I've used this configuration:
```
flink.mesos.master: zk://127.0.0.1:2181/mesos
flink.uberjar.location: hdfs:///user/jclouds/flink-dist-0.10-SNAPSHOT.jar
flink.mesos.taskmanagers.mem: 512
flink.mesos.taskmanagers.cpu: 0.5
taskmanager.logging.level: INFO
streamingMode: streaming

jobmanager.web.port: 8081
webclient.port: 8080
```

But I'm getting this error
```
Exception in thread main java.lang.NullPointerException
at 
org.apache.flink.mesos.scheduler.SchedulerUtils$class.createFrameworkInfoAndCredentials(SchedulerUtils.scala:255)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.createFrameworkInfoAndCredentials(FlinkScheduler.scala:31)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.main(FlinkScheduler.scala:183)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler.main(FlinkScheduler.scala)
```

I'll further investigate the issue.

*Why did you decide to start the JobManager alongside the Scheduler?*
For Flink on YARN, we are starting the JobManager in a separate container. 
There is a lot of communication going on between the JobManager and 
TaskManagers, also, we need to ensure that the TaskManagers are able to reach 
the JM.
I think we can safely assume that containers can always communicate among 
each other ... I'm not so sure about Mesos clients and cluster containers.

 The mesos scheduler is not HA and should be used with marathon or similar 
service to ensure that there is always one instance running. This may be 
addressed in future patches.

Would you start the mesos scheduler on the client machine or inside the 
cluster, using a container?
Whats the typical deployment model for Mesos?



 Integrate Flink with Apache Mesos
 -

 Key: FLINK-1984
 URL: https://issues.apache.org/jira/browse/FLINK-1984
 Project: Flink
  Issue Type: New Feature
  Components: New Components
Reporter: Robert Metzger
Priority: Minor
 Attachments: 251.patch


 There are some users asking for an integration of Flink into Mesos.
 There also is a pending pull request for adding Mesos support for Flink: 
 https://github.com/apache/flink/pull/251
 But the PR is insufficiently tested. I'll add the code of the pull request to 
 this JIRA in case somebody wants to pick it up in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131796852
  
Hi @ankurcha,
I've started a Mesos cluster on Google Compute Engine to try out your pull 
request.

I've used this configuration:
```
flink.mesos.master: zk://127.0.0.1:2181/mesos
flink.uberjar.location: hdfs:///user/jclouds/flink-dist-0.10-SNAPSHOT.jar
flink.mesos.taskmanagers.mem: 512
flink.mesos.taskmanagers.cpu: 0.5
taskmanager.logging.level: INFO
streamingMode: streaming

jobmanager.web.port: 8081
webclient.port: 8080
```

But I'm getting this error
```
Exception in thread main java.lang.NullPointerException
at 
org.apache.flink.mesos.scheduler.SchedulerUtils$class.createFrameworkInfoAndCredentials(SchedulerUtils.scala:255)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.createFrameworkInfoAndCredentials(FlinkScheduler.scala:31)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.main(FlinkScheduler.scala:183)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler.main(FlinkScheduler.scala)
```

I'll further investigate the issue.

*Why did you decide to start the JobManager alongside the Scheduler?*
For Flink on YARN, we are starting the JobManager in a separate container. 
There is a lot of communication going on between the JobManager and 
TaskManagers, also, we need to ensure that the TaskManagers are able to reach 
the JM.
I think we can safely assume that containers can always communicate among 
each other ... I'm not so sure about Mesos clients and cluster containers.

 The mesos scheduler is not HA and should be used with marathon or similar 
service to ensure that there is always one instance running. This may be 
addressed in future patches.

Would you start the mesos scheduler on the client machine or inside the 
cluster, using a container?
Whats the typical deployment model for Mesos?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1017


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-17 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699252#comment-14699252
 ] 

Márton Balassi commented on FLINK-2514:
---

Cool, thanks.

 Local and Remote environment behave differently when re-triggering execution.
 -

 Key: FLINK-2514
 URL: https://issues.apache.org/jira/browse/FLINK-2514
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Aljoscha Krettek
Priority: Critical
 Fix For: 0.10


 The following code behaves differently on the {{LocalStreamEnvironment}} and 
 the {{RemoteStreamEnvironment}}.
 {code}
 StreamExecutionEnvironment env = ...;
 env.addSource(someSource).addSink(someSink);
 env.execute();
 env.addSource(anotherSource).addSink(anotherSink);
 env.execute();
 {code}
 Locally, only the second source/sink pair is executed.
 Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2536) Add a retry for SocketClientSink

2015-08-17 Thread Huang Wei (JIRA)
Huang Wei created FLINK-2536:


 Summary: Add a retry for SocketClientSink
 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10


I found the SocketClientSink doesn`t use a re-connect when disconnect from the 
socket server or get exception.
I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2514) Local and Remote environment behave differently when re-triggering execution.

2015-08-17 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-2514:
--
Assignee: Aljoscha Krettek  (was: Márton Balassi)

 Local and Remote environment behave differently when re-triggering execution.
 -

 Key: FLINK-2514
 URL: https://issues.apache.org/jira/browse/FLINK-2514
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Aljoscha Krettek
Priority: Critical
 Fix For: 0.10


 The following code behaves differently on the {{LocalStreamEnvironment}} and 
 the {{RemoteStreamEnvironment}}.
 {code}
 StreamExecutionEnvironment env = ...;
 env.addSource(someSource).addSink(someSink);
 env.execute();
 env.addSource(anotherSource).addSink(anotherSink);
 env.execute();
 {code}
 Locally, only the second source/sink pair is executed.
 Remotely, both are re-executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-17 Thread Henry Saputra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Saputra closed FLINK-2530.

Resolution: Won't Fix

 optimize equal() of AcknowledgeCheckpoint
 -

 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699295#comment-14699295
 ] 

Sachin Goel edited comment on FLINK-2448 at 8/17/15 9:56 AM:
-

Aha. It makes sense. The {{TestEnvironment}} context factory returns the same 
object always. The second {{getEnvironment}} call doesn't actually do anything.


was (Author: sachingoel0101):
Aha. It makes sense. The `TestEnvironment` context factory returns the same 
object always. The second `getEnvironment` call doesn't actually do anything.

 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1029#issuecomment-131759047
  
This seems correct, will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2534) Improve execution code in CompactingHashTable.java

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699298#comment-14699298
 ] 

ASF GitHub Bot commented on FLINK-2534:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1029#issuecomment-131759047
  
This seems correct, will merge this...


 Improve execution code in CompactingHashTable.java
 --

 Key: FLINK-2534
 URL: https://issues.apache.org/jira/browse/FLINK-2534
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found some improved code in CompactingHashTable.java since this code will 
 execute many times when flink runs.
 In my opinion, some codes in for and while can be optimized to reduce the 
 times of execution and it is effective to increase the performance.
 For example, the code following:
 'while(numBuckets % numPartitions != 0) {
   numBuckets++;
   }'
 can be optimized into a formula:
 numBuckets += numPartitions - (numBuckets % numPartitions);



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699314#comment-14699314
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173574
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -93,6 +93,15 @@ public int getIndexOfThisSubtask() {
}
 
@Override
+   public String getTaskNameWithSubtasks() {
--- End diff --

Since this method may be called rather often, I would create this string 
once and return it then.


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2015-08-17 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699315#comment-14699315
 ] 

Robert Metzger commented on FLINK-2491:
---

I quickly talked with [~StephanEwen] about this. Currently, the checkpoints are 
aborted if not all tasks are online, so no checkpointing is happening if some 
of the tasks have finished.
A simple workaround is keeping the source running even tough its not producing 
anything.


 Operators are not participating in state checkpointing in some cases
 

 Key: FLINK-2491
 URL: https://issues.apache.org/jira/browse/FLINK-2491
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 While implementing a test case for the Kafka Consumer, I came across the 
 following bug:
 Consider the following topology, with the operator parallelism in parentheses:
 Source (2) -- Sink (1).
 In this setup, the {{snapshotState()}} method is called on the source, but 
 not on the Sink.
 The sink receives the generated data.
 The only one of the two sources is generating data.
 I've implemented a test case for this, you can find it here: 
 https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2534) Improve execution code in CompactingHashTable.java

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2534.
---

 Improve execution code in CompactingHashTable.java
 --

 Key: FLINK-2534
 URL: https://issues.apache.org/jira/browse/FLINK-2534
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found some improved code in CompactingHashTable.java since this code will 
 execute many times when flink runs.
 In my opinion, some codes in for and while can be optimized to reduce the 
 times of execution and it is effective to increase the performance.
 For example, the code following:
 'while(numBuckets % numPartitions != 0) {
   numBuckets++;
   }'
 can be optimized into a formula:
 numBuckets += numPartitions - (numBuckets % numPartitions);



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2462.
---

 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699220#comment-14699220
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131735854
  
This looks like a very nice continuation of the cleanup work. I'd suggest 
to merge it rather sooner than later.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2015-08-17 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699292#comment-14699292
 ] 

Robert Metzger commented on FLINK-2491:
---

I suspect FLINK-2519 resolves this bug.
I'll re-run my test case to see whether it is fixed now.

 Operators are not participating in state checkpointing in some cases
 

 Key: FLINK-2491
 URL: https://issues.apache.org/jira/browse/FLINK-2491
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 While implementing a test case for the Kafka Consumer, I came across the 
 following bug:
 Consider the following topology, with the operator parallelism in parentheses:
 Source (2) -- Sink (1).
 In this setup, the {{snapshotState()}} method is called on the source, but 
 not on the Sink.
 The sink receives the generated data.
 The only one of the two sources is generating data.
 I've implemented a test case for this, you can find it here: 
 https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2015-08-17 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-2491:
--
Comment: was deleted

(was: Yes, it fixes it - that is why I removed my previous comment that was 
wrong anyway. But I was slow with commenting here. :))

 Operators are not participating in state checkpointing in some cases
 

 Key: FLINK-2491
 URL: https://issues.apache.org/jira/browse/FLINK-2491
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 While implementing a test case for the Kafka Consumer, I came across the 
 following bug:
 Consider the following topology, with the operator parallelism in parentheses:
 Source (2) -- Sink (1).
 In this setup, the {{snapshotState()}} method is called on the source, but 
 not on the Sink.
 The sink receives the generated data.
 The only one of the two sources is generating data.
 I've implemented a test case for this, you can find it here: 
 https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173822
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -93,6 +93,15 @@ public int getIndexOfThisSubtask() {
}
 
@Override
+   public String getTaskNameWithSubtasks() {
--- End diff --

Will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...

2015-08-17 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131781343
  
Should there be a unit test to verify this functionality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699373#comment-14699373
 ] 

ASF GitHub Bot commented on FLINK-2448:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131781343
  
Should there be a unit test to verify this functionality?


 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2415) Link nodes in plan to vertices

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-2415:
---

Assignee: Stephan Ewen

 Link nodes in plan to vertices
 --

 Key: FLINK-2415
 URL: https://issues.apache.org/jira/browse/FLINK-2415
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Reporter: Piotr Godek
Assignee: Stephan Ewen

 The plan API function (/jobs/jobid/plan) lacks vertices' identifiers, so 
 that plan can be linked to execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2415) Link nodes in plan to vertices

2015-08-17 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699374#comment-14699374
 ] 

Stephan Ewen commented on FLINK-2415:
-

The plan is currently created from the Optimizer Plan, which has different 
vertices than the actual runtime graph (due to iterations, chaining).

I would change the generation of the runtime plan as follows:
  - The JSON plan is generated for / by the JobGraph
  - Extra JSON information may be attached to each JobVertex and each input, 
such that the optimizer make its information available to the runtime dashboard.

That way, each node in the JSON graph is naturally linked to the execution 
statistics by the {{JobVertexID}}

 Link nodes in plan to vertices
 --

 Key: FLINK-2415
 URL: https://issues.apache.org/jira/browse/FLINK-2415
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Reporter: Piotr Godek
Assignee: Stephan Ewen

 The plan API function (/jobs/jobid/plan) lacks vertices' identifiers, so 
 that plan can be linked to execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2531) combining the if branch to improve the performance

2015-08-17 Thread Henry Saputra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Saputra closed FLINK-2531.

Resolution: Won't Fix

 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Some updates for programming_guide.md

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1019#issuecomment-131756217
  
I merged this in 3c1b5f0e7f28d18868e941712d2ca42b140ef3a0 , but forgot the 
closing message.

Can you manually close this pull request?

Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: some small changes for easier understanding.

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1018#issuecomment-131756341
  
Will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...

2015-08-17 Thread sachingoel0101
GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1031

[FLINK-2448]Clear cache file list in ExecutionEnvironment after program 
plan creation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1031


commit 199fa54cec1c6a9749fbbebf009267a997a7b275
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-17T10:00:52Z

[FLINK-2448][hotfix]Clear cache file list in ExecutionEnvironment after 
registering with Plan




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -120,7 +125,7 @@ public TaskDeploymentDescriptor(
ListInputGateDeploymentDescriptor inputGates,
ListBlobKey requiredJarFiles, int targetSlotNumber) {
 
-   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
+   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, 0,
--- End diff --

I think such constructors with default values are dangerous, as people tend 
to call the wrong constructors.
Someone who creates the deployment descriptor should think about providing 
the attempt number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699318#comment-14699318
 ] 

ASF GitHub Bot commented on FLINK-2448:
---

GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1031

[FLINK-2448]Clear cache file list in ExecutionEnvironment after program 
plan creation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1031


commit 199fa54cec1c6a9749fbbebf009267a997a7b275
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-17T10:00:52Z

[FLINK-2448][hotfix]Clear cache file list in ExecutionEnvironment after 
registering with Plan




 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699319#comment-14699319
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -120,7 +125,7 @@ public TaskDeploymentDescriptor(
ListInputGateDeploymentDescriptor inputGates,
ListBlobKey requiredJarFiles, int targetSlotNumber) {
 
-   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
+   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, 0,
--- End diff --

I think such constructors with default values are dangerous, as people tend 
to call the wrong constructors.
Someone who creates the deployment descriptor should think about providing 
the attempt number.


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699321#comment-14699321
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173822
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -93,6 +93,15 @@ public int getIndexOfThisSubtask() {
}
 
@Override
+   public String getTaskNameWithSubtasks() {
--- End diff --

Will fix this.


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699333#comment-14699333
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-131767636
  
Looks mostly good.

The `TaskRuntimeInfo` is now a bit strange. It contains some runtime info 
for tasks (attempt number) but not all (subtasks, etc). The attempt number is 
in the `TaskDeploymentDescriptor` anyways, so why copy it in addition to the 
`TaskRuntimeInfo`? Before, the `TaskManagerInfo` was clearly the context info 
of the TaskManager that was the same for all tasks and there was no duplicate 
information.

Looks like the motivation was to minimize the number of objects passed to 
the `RuntimeContext`. In that case, why not create a `RuntimeInfo` for the task 
(keep the `TaskManagerInfo`), put all the task-specific information in there, 
pass it to the `RuntimeEnvironment` and `RuntimeContext` and let them return 
all info like `getTaskName` and `getIndexOfThisSubtask` from there?


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699351#comment-14699351
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131772364
  
Allright, if there are no further comments, I'll merge this...


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699324#comment-14699324
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -120,7 +125,7 @@ public TaskDeploymentDescriptor(
ListInputGateDeploymentDescriptor inputGates,
ListBlobKey requiredJarFiles, int targetSlotNumber) {
 
-   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
+   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, 0,
--- End diff --

This constructor was only being used in test classes, and the attempt 
number would've been kept 1 for them all. 
But yes. You're right. Leads to more consistency. Will fix this too.


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -120,7 +125,7 @@ public TaskDeploymentDescriptor(
ListInputGateDeploymentDescriptor inputGates,
ListBlobKey requiredJarFiles, int targetSlotNumber) {
 
-   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
+   this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, 0,
--- End diff --

This constructor was only being used in test classes, and the attempt 
number would've been kept 1 for them all. 
But yes. You're right. Leads to more consistency. Will fix this too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...

2015-08-17 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131769530
  
Does this work properly with collect() calls? as in, would the following 
plan still work?

`env = ..
env.registerCacheFile()
...
someSet.collect()
doSomethingThatUsesTheCacheFile
env.execute()
`

if we wipe all cache entries in the collect() call, the files will not be 
registered in the execute(), right? The plans these methods create are separate 
i think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-855) Web interface: flow vs stack layout

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-855.
--

 Web interface: flow vs stack layout
 ---

 Key: FLINK-855
 URL: https://issues.apache.org/jira/browse/FLINK-855
 Project: Flink
  Issue Type: Improvement
Reporter: GitHub Import
Priority: Trivial
  Labels: github-import, starter
 Fix For: pre-apache


 I don't understand the real difference between the flow and stack layout in 
 the web interface. Let's stick with one of the two.
 ![screen shot 2014-05-24 at 18 11 
 37|https://cloud.githubusercontent.com/assets/1756620/3075006/3336865c-e35e-11e3-98ca-ea1feeaf7415.png]
 ![screen shot 2014-05-24 at 18 11 
 48|https://cloud.githubusercontent.com/assets/1756620/3075007/333c1a04-e35e-11e3-8be8-6c8c0185b809.png]
  Imported from GitHub 
 Url: https://github.com/stratosphere/stratosphere/issues/855
 Created by: [uce|https://github.com/uce]
 Labels: enhancement, gui, user satisfaction, 
 Created at: Sat May 24 18:13:23 CEST 2014
 State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-855) Web interface: flow vs stack layout

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-855.

Resolution: Won't Fix

Will be subsumed by the new web dashboard

 Web interface: flow vs stack layout
 ---

 Key: FLINK-855
 URL: https://issues.apache.org/jira/browse/FLINK-855
 Project: Flink
  Issue Type: Improvement
Reporter: GitHub Import
Priority: Trivial
  Labels: github-import, starter
 Fix For: pre-apache


 I don't understand the real difference between the flow and stack layout in 
 the web interface. Let's stick with one of the two.
 ![screen shot 2014-05-24 at 18 11 
 37|https://cloud.githubusercontent.com/assets/1756620/3075006/3336865c-e35e-11e3-98ca-ea1feeaf7415.png]
 ![screen shot 2014-05-24 at 18 11 
 48|https://cloud.githubusercontent.com/assets/1756620/3075007/333c1a04-e35e-11e3-8be8-6c8c0185b809.png]
  Imported from GitHub 
 Url: https://github.com/stratosphere/stratosphere/issues/855
 Created by: [uce|https://github.com/uce]
 Labels: enhancement, gui, user satisfaction, 
 Created at: Sat May 24 18:13:23 CEST 2014
 State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699439#comment-14699439
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1017


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: some small changes for easier understanding.

2015-08-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1018


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...

2015-08-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1029


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699441#comment-14699441
 ] 

ASF GitHub Bot commented on FLINK-2527:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1027


 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2527] [gelly] Ensure that VertexUpdateF...

2015-08-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1027


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2534) Improve execution code in CompactingHashTable.java

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699440#comment-14699440
 ] 

ASF GitHub Bot commented on FLINK-2534:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1029


 Improve execution code in CompactingHashTable.java
 --

 Key: FLINK-2534
 URL: https://issues.apache.org/jira/browse/FLINK-2534
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found some improved code in CompactingHashTable.java since this code will 
 execute many times when flink runs.
 In my opinion, some codes in for and while can be optimized to reduce the 
 times of execution and it is effective to increase the performance.
 For example, the code following:
 'while(numBuckets % numPartitions != 0) {
   numBuckets++;
   }'
 can be optimized into a formula:
 numBuckets += numPartitions - (numBuckets % numPartitions);



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2534) Improve execution code in CompactingHashTable.java

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2534.
-
Resolution: Fixed

Fixed via 63ee34c5b894e2795e74a3c2aa3d5dc9ac2d5b88

Thank you for the contribution!

 Improve execution code in CompactingHashTable.java
 --

 Key: FLINK-2534
 URL: https://issues.apache.org/jira/browse/FLINK-2534
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found some improved code in CompactingHashTable.java since this code will 
 execute many times when flink runs.
 In my opinion, some codes in for and while can be optimized to reduce the 
 times of execution and it is effective to increase the performance.
 For example, the code following:
 'while(numBuckets % numPartitions != 0) {
   numBuckets++;
   }'
 can be optimized into a formula:
 numBuckets += numPartitions - (numBuckets % numPartitions);



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2462.
-
Resolution: Fixed

Fixed via 92b1e471d4762545637817c74f9396765984b39a

 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699218#comment-14699218
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37168797
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 ---
@@ -39,61 +37,40 @@
  */
 public class SourceStreamTaskOUT extends StreamTaskOUT, 
StreamSourceOUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(SourceStreamTask.class);
-
@Override
-   public void invoke() throws Exception {
-   final SourceOutputStreamRecordOUT output = new 
SourceOutputStreamRecordOUT(outputHandler.getOutput(), checkpointLock);
-
-   boolean operatorOpen = false;
-
-   if (LOG.isDebugEnabled()) {
-   LOG.debug(Task {} invoked, getName());
-   }
-
-   try {
-   openOperator();
-   operatorOpen = true;
-
-   streamOperator.run(checkpointLock, output);
-
-   closeOperator();
-   operatorOpen = false;
-
-   if (LOG.isDebugEnabled()) {
-   LOG.debug(Task {} invocation finished, 
getName());
-   }
-
-   }
-   catch (Exception e) {
-   LOG.error(getEnvironment().getTaskNameWithSubtasks() + 
 failed, e);
-
-   if (operatorOpen) {
-   try {
-   closeOperator();
-   }
-   catch (Throwable t) {
-   LOG.warn(Exception while closing 
operator., t);
-   }
-   }
-   throw e;
-   }
-   finally {
-   this.isRunning = false;
-   // Cleanup
-   outputHandler.flushOutputs();
-   clearBuffers();
-   }
+   protected void init() {
+   // does not hold any resources, so no initialization needed
+   }
 
+   @Override
+   protected void cleanup() {
+   // does not hold any resources, so no cleanup needed
}
+   
 
@Override
-   public void cancel() {
-   super.cancel();
+   protected void run() throws Exception {
+   final Object checkpointLock = getCheckpointLock();
+   
+   final SourceOutputStreamRecordOUT output = 
+   new 
SourceOutputStreamRecordOUT(outputHandler.getOutput(), checkpointLock);
+   
+   streamOperator.run(checkpointLock, output);
+   }
+   
+   @Override
+   protected void cancelTask() throws Exception {
streamOperator.cancel();
}
 
+   // 

+   
+   // TODO:
+   // does this help with anything? The losk should be already held by the 
source function that
+   // emits. If that one does not hold the lock, then this does not help 
either.
+   
--- End diff --

This is for the case where the source does not actually acquire the lock 
(because it is not participating in checkpointing). This might be a degenerate 
case, however, and not really necessary.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37168797
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 ---
@@ -39,61 +37,40 @@
  */
 public class SourceStreamTaskOUT extends StreamTaskOUT, 
StreamSourceOUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(SourceStreamTask.class);
-
@Override
-   public void invoke() throws Exception {
-   final SourceOutputStreamRecordOUT output = new 
SourceOutputStreamRecordOUT(outputHandler.getOutput(), checkpointLock);
-
-   boolean operatorOpen = false;
-
-   if (LOG.isDebugEnabled()) {
-   LOG.debug(Task {} invoked, getName());
-   }
-
-   try {
-   openOperator();
-   operatorOpen = true;
-
-   streamOperator.run(checkpointLock, output);
-
-   closeOperator();
-   operatorOpen = false;
-
-   if (LOG.isDebugEnabled()) {
-   LOG.debug(Task {} invocation finished, 
getName());
-   }
-
-   }
-   catch (Exception e) {
-   LOG.error(getEnvironment().getTaskNameWithSubtasks() + 
 failed, e);
-
-   if (operatorOpen) {
-   try {
-   closeOperator();
-   }
-   catch (Throwable t) {
-   LOG.warn(Exception while closing 
operator., t);
-   }
-   }
-   throw e;
-   }
-   finally {
-   this.isRunning = false;
-   // Cleanup
-   outputHandler.flushOutputs();
-   clearBuffers();
-   }
+   protected void init() {
+   // does not hold any resources, so no initialization needed
+   }
 
+   @Override
+   protected void cleanup() {
+   // does not hold any resources, so no cleanup needed
}
+   
 
@Override
-   public void cancel() {
-   super.cancel();
+   protected void run() throws Exception {
+   final Object checkpointLock = getCheckpointLock();
+   
+   final SourceOutputStreamRecordOUT output = 
+   new 
SourceOutputStreamRecordOUT(outputHandler.getOutput(), checkpointLock);
+   
+   streamOperator.run(checkpointLock, output);
+   }
+   
+   @Override
+   protected void cancelTask() throws Exception {
streamOperator.cancel();
}
 
+   // 

+   
+   // TODO:
+   // does this help with anything? The losk should be already held by the 
source function that
+   // emits. If that one does not hold the lock, then this does not help 
either.
+   
--- End diff --

This is for the case where the source does not actually acquire the lock 
(because it is not participating in checkpointing). This might be a degenerate 
case, however, and not really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2527] [gelly] Ensure that VertexUpdateF...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1027#issuecomment-131753704
  
Looks good, will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2015-08-17 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699299#comment-14699299
 ] 

Robert Metzger commented on FLINK-2491:
---

The issue still persists

{code}
1750 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 9 @ 1439805353030
1750 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
1800 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 10 @ 1439805353080
1800 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
1850 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 11 @ 1439805353130
1851 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
1900 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 12 @ 1439805353180
1900 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
1950 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 13 @ 1439805353230
1951 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
2000 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 14 @ 1439805353280
2000 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
2050 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 15 @ 1439805353330
2051 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
2100 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 16 @ 1439805353380
2100 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
2150 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 17 @ 1439805353430
2150 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Custom Source (2/4) is not being executed at the moment. 
Aborting checkpoint.
{code}


 Operators are not participating in state checkpointing in some cases
 

 Key: FLINK-2491
 URL: https://issues.apache.org/jira/browse/FLINK-2491
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 While implementing a test case for the Kafka Consumer, I came across the 
 following bug:
 Consider the following topology, with the operator parallelism in parentheses:
 Source (2) -- Sink (1).
 In this setup, the {{snapshotState()}} method is called on the source, but 
 not on the Sink.
 The sink receives the generated data.
 The only one of the two sources is generating data.
 I've implemented a test case for this, you can find it here: 
 https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-131768999
  
Yes. Minimizing the arguments being passed to `RuntimeContext` was the 
motivation. I thought about putting every task specific field into the TaskInfo 
object but since it isn't a real problem, hesitated. 
Should I do that then? I certainly like the idea. The constructors for 
`RuntimeContext` are somewhat messy, and doing this would make any future 
changes to contexts a lot easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699342#comment-14699342
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-131768999
  
Yes. Minimizing the arguments being passed to `RuntimeContext` was the 
motivation. I thought about putting every task specific field into the TaskInfo 
object but since it isn't a real problem, hesitated. 
Should I do that then? I certainly like the idea. The constructors for 
`RuntimeContext` are somewhat messy, and doing this would make any future 
changes to contexts a lot easier.


 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699340#comment-14699340
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37174684
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
 ---
@@ -33,9 +33,8 @@
 * Emits a {@link Watermark} from an operator. This watermark is 
broadcast to all downstream
 * operators.
 *
-* p
-* A watermark specifies that no element with a timestamp older or 
equal to the watermark
-* timestamp will be emitted in the future.
+* pA watermark specifies that no element with a timestamp older or 
equal to the watermark
--- End diff --

You are right. I think I saw it differently in some of Sun's classes, and 
copied the style.

It seems the changes to not hurt (JavaDocs interpret the HTML properly), 
but I'll stick with the official style in the future. Thanks for pointing that 
out.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37174684
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
 ---
@@ -33,9 +33,8 @@
 * Emits a {@link Watermark} from an operator. This watermark is 
broadcast to all downstream
 * operators.
 *
-* p
-* A watermark specifies that no element with a timestamp older or 
equal to the watermark
-* timestamp will be emitted in the future.
+* pA watermark specifies that no element with a timestamp older or 
equal to the watermark
--- End diff --

You are right. I think I saw it differently in some of Sun's classes, and 
copied the style.

It seems the changes to not hurt (JavaDocs interpret the HTML properly), 
but I'll stick with the official style in the future. Thanks for pointing that 
out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2415) Link nodes in plan to vertices

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-2415:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-2357

 Link nodes in plan to vertices
 --

 Key: FLINK-2415
 URL: https://issues.apache.org/jira/browse/FLINK-2415
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Reporter: Piotr Godek

 The plan API function (/jobs/jobid/plan) lacks vertices' identifiers, so 
 that plan can be linked to execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2527.
-
Resolution: Fixed

Fixed
  - 0.10 in 0ea0bc12b3f8a8a82b6fca563340af547c0a02ab
  - 0.9.1 in e8802f90a4d38dbd4f3fc12b973639dbf50b61bb

 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2527.
---

 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131735854
  
This looks like a very nice continuation of the cleanup work. I'd suggest 
to merge it rather sooner than later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Issue Comment Deleted] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2015-08-17 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-2491:
--
Comment: was deleted

(was: Here is the root cause. [1]

[1] 
https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L415

The same parallelism case works because of chaining.)

 Operators are not participating in state checkpointing in some cases
 

 Key: FLINK-2491
 URL: https://issues.apache.org/jira/browse/FLINK-2491
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Márton Balassi
Priority: Critical
 Fix For: 0.10


 While implementing a test case for the Kafka Consumer, I came across the 
 following bug:
 Consider the following topology, with the operator parallelism in parentheses:
 Source (2) -- Sink (1).
 In this setup, the {{snapshotState()}} method is called on the source, but 
 not on the Sink.
 The sink receives the generated data.
 The only one of the two sources is generating data.
 I've implemented a test case for this, you can find it here: 
 https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1026#discussion_r37173574
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -93,6 +93,15 @@ public int getIndexOfThisSubtask() {
}
 
@Override
+   public String getTaskNameWithSubtasks() {
--- End diff --

Since this method may be called rather often, I would create this string 
once and return it then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [CLEANUP] Add space between quotes and plus si...

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1010#issuecomment-131770992
  
+1 for adding a rule (otherwise, I will not learn it ;) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-1321) New web interface, contains parts from WebInfoServer and WebClient

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-1321.
---
Resolution: Fixed

 New web interface, contains parts from WebInfoServer and WebClient
 --

 Key: FLINK-1321
 URL: https://issues.apache.org/jira/browse/FLINK-1321
 Project: Flink
  Issue Type: New Feature
  Components: JobManager, Webfrontend
Reporter: Matthias Schumacher
Priority: Minor

 The new webserver is based on the data from Runtime WebInfoServer and is 
 extended with the functionality and the graph from WebClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-1321) New web interface, contains parts from WebInfoServer and WebClient

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-1321.
-
   Resolution: Incomplete
Fix Version/s: (was: 0.7.0-incubating)

No progress on that issue.


 New web interface, contains parts from WebInfoServer and WebClient
 --

 Key: FLINK-1321
 URL: https://issues.apache.org/jira/browse/FLINK-1321
 Project: Flink
  Issue Type: New Feature
  Components: JobManager, Webfrontend
Reporter: Matthias Schumacher
Priority: Minor

 The new webserver is based on the data from Runtime WebInfoServer and is 
 extended with the functionality and the graph from WebClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2365) Review of How to contribute page

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2365.
---

 Review of How to contribute page
 

 Key: FLINK-2365
 URL: https://issues.apache.org/jira/browse/FLINK-2365
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Enrique Bautista Barahona
Priority: Minor

 While reading the [How to contribute 
 page|https://flink.apache.org/how-to-contribute.html] on the website I have 
 noticed some typos, broken links, inconsistent formatting, etc.
 I plan to submit a PR with some improvements soon.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-1321) New web interface, contains parts from WebInfoServer and WebClient

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reopened FLINK-1321:
-

 New web interface, contains parts from WebInfoServer and WebClient
 --

 Key: FLINK-1321
 URL: https://issues.apache.org/jira/browse/FLINK-1321
 Project: Flink
  Issue Type: New Feature
  Components: JobManager, Webfrontend
Reporter: Matthias Schumacher
Priority: Minor

 The new webserver is based on the data from Runtime WebInfoServer and is 
 extended with the functionality and the graph from WebClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2365) Review of How to contribute page

2015-08-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2365.
-
Resolution: Fixed

Fixed as part of 1f917e3b40fc74431e06834826cc556e4e78c48b

 Review of How to contribute page
 

 Key: FLINK-2365
 URL: https://issues.apache.org/jira/browse/FLINK-2365
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Enrique Bautista Barahona
Priority: Minor

 While reading the [How to contribute 
 page|https://flink.apache.org/how-to-contribute.html] on the website I have 
 noticed some typos, broken links, inconsistent formatting, etc.
 I plan to submit a PR with some improvements soon.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-08-17 Thread HuangWHWHW
GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1030

[FLINK-2536][streaming]add a re-connect for socket sink

add a re-connect in function invoke() when it throws exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2536

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1030


commit 85d5bb50419d6b803a9fc966dd4f95fcd042e21c
Author: HuangWHWHW 404823...@qq.com
Date:   2015-08-17T09:32:04Z

[FLINK-2536][streaming]add a re-connect for socket sink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699265#comment-14699265
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1030

[FLINK-2536][streaming]add a re-connect for socket sink

add a re-connect in function invoke() when it throws exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2536

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1030


commit 85d5bb50419d6b803a9fc966dd4f95fcd042e21c
Author: HuangWHWHW 404823...@qq.com
Date:   2015-08-17T09:32:04Z

[FLINK-2536][streaming]add a re-connect for socket sink




 Add a retry for SocketClientSink
 

 Key: FLINK-2536
 URL: https://issues.apache.org/jira/browse/FLINK-2536
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 I found the SocketClientSink doesn`t use a re-connect when disconnect from 
 the socket server or get exception.
 I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699275#comment-14699275
 ] 

Chesnay Schepler commented on FLINK-2448:
-

yep that's what i mean. the environment is retrieved using 
ExecutionEnvironment.getEnvironment() for both jobs.
essentially this is what runs:
{code}
@Test
public void MyTest() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getEnvironment();
env.registerCacheFile(X)
doSomeStuff
env.execute()


env = ExecutionEnvironment.getEnvironment();
env.registerCacheFile(X)
doSomeStuff
env.execute()
}
{code}

 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699345#comment-14699345
 ] 

ASF GitHub Bot commented on FLINK-2448:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131769530
  
Does this work properly with collect() calls? as in, would the following 
plan still work?

`env = ..
env.registerCacheFile()
...
someSet.collect()
doSomethingThatUsesTheCacheFile
env.execute()
`

if we wipe all cache entries in the collect() call, the files will not be 
registered in the execute(), right? The plans these methods create are separate 
i think.


 registerCacheFile fails with MultipleProgramsTestbase
 -

 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When trying to register a file using a constant name an expection is thrown 
 saying the file was already cached.
 This is probably because the same environment is reused, and the cacheFile 
 entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2538) Potential resource leak in ClassLoaderUtil#getUserCodeClassLoaderInfo()

2015-08-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-2538:
-

 Summary: Potential resource leak in 
ClassLoaderUtil#getUserCodeClassLoaderInfo()
 Key: FLINK-2538
 URL: https://issues.apache.org/jira/browse/FLINK-2538
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


In ClassLoaderUtil#getUserCodeClassLoaderInfo() around line 76:
{code}
  else {
try {
  new JarFile(filePath);
  bld.append( (valid JAR));
}
catch (Exception e) {
  bld.append( (invalid JAR: 
).append(e.getMessage()).append(')');
}
  }
{code}
The JarFile isn't closed before returning, leading to potential resource leak.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [CLEANUP] Add space between quotes and plus si...

2015-08-17 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1010#issuecomment-132005060
  
Thanks all, will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2526) Add catch{} for task when it stop running

2015-08-17 Thread fangfengbin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fangfengbin closed FLINK-2526.
--
Resolution: Fixed

 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131812796
  
It seems that you are ignoring methods such as `error(driver: 
SchedulerDriver, message: String)` or `frameworkMessage()`.
Are they application specific (e.g. send by our scheduler) or are they 
receiving events by Mesos?
I think we should not ignore these events.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699503#comment-14699503
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131812796
  
It seems that you are ignoring methods such as `error(driver: 
SchedulerDriver, message: String)` or `frameworkMessage()`.
Are they application specific (e.g. send by our scheduler) or are they 
receiving events by Mesos?
I think we should not ignore these events.


 Integrate Flink with Apache Mesos
 -

 Key: FLINK-1984
 URL: https://issues.apache.org/jira/browse/FLINK-1984
 Project: Flink
  Issue Type: New Feature
  Components: New Components
Reporter: Robert Metzger
Priority: Minor
 Attachments: 251.patch


 There are some users asking for an integration of Flink into Mesos.
 There also is a pending pull request for adding Mesos support for Flink: 
 https://github.com/apache/flink/pull/251
 But the PR is insufficiently tested. I'll add the code of the pull request to 
 this JIRA in case somebody wants to pick it up in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2386) Implement Kafka connector using the new Kafka Consumer API

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699594#comment-14699594
 ] 

ASF GitHub Bot commented on FLINK-2386:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1028#issuecomment-131843222
  
How about dropping the backported Kafka code and relying completely on our 
own implementation against the SimpleConsumer API?
We would need to implement the `KafkaConsumer.partitionsFor()` method 
ourselves, but I think that's doable.


 Implement Kafka connector using the new Kafka Consumer API
 --

 Key: FLINK-2386
 URL: https://issues.apache.org/jira/browse/FLINK-2386
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Robert Metzger
Assignee: Robert Metzger

 Once Kafka has released its new consumer API, we should provide a connector 
 for that version.
 The release will probably be called 0.9 or 0.8.3.
 The connector will be mostly compatible with Kafka 0.8.2.x, except for 
 committing offsets to the broker (the new connector expects a coordinator to 
 be available on Kafka). To work around that, we can provide a configuration 
 option to commit offsets to zookeeper (managed by flink code).
 For 0.9/0.8.3 it will be fully compatible.
 It will not be compatible with 0.8.1 because of mismatching Kafka messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699493#comment-14699493
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37184472
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala 
---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.executor
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.log4j.{ConsoleAppender, Level, Logger = ApacheLogger, 
PatternLayout}
+import org.apache.mesos.{Executor, ExecutorDriver}
+import org.apache.mesos.Protos._
+
+trait FlinkExecutor extends Executor {
+  // logger to use
+  def LOG: org.slf4j.Logger
+
+  var currentRunningTaskId: Option[TaskID] = None
+  val TASK_MANAGER_LOGGING_LEVEL_KEY = taskmanager.logging.level
+  val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = INFO
+
+
+  // methods that defines how the task is started when a launchTask is sent
+  def startTask(streamingMode: StreamingMode): Try[Unit]
+
+  var thread: Option[Thread] = None
+  var slaveId: Option[SlaveID] = None
+
+  override def shutdown(driver: ExecutorDriver): Unit = {
+LOG.info(Killing taskManager thread)
+// kill task manager thread
+for (t - thread) {
+  t.stop()
+}
+
+// exit
+sys.exit(0)
+  }
+
+  override def disconnected(driver: ExecutorDriver): Unit = {}
+
+  override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {
+for (t - thread) {
+  LOG.info(sKilling task : ${taskId.getValue})
+  thread = None
+  currentRunningTaskId = None
+
+  // stop running thread
+  t.stop()
+
+  // Send the TASK_FINISHED status
+  driver.sendStatusUpdate(TaskStatus.newBuilder()
+.setTaskId(taskId)
+.setState(TaskState.TASK_FINISHED)
+.build())
+}
+  }
+
+
+  override def error(driver: ExecutorDriver, message: String): Unit = {}
+
+  override def frameworkMessage(driver: ExecutorDriver, data: 
Array[Byte]): Unit = {}
+
+  override def registered(driver: ExecutorDriver, executorInfo: 
ExecutorInfo,
+  frameworkInfo: FrameworkInfo, slaveInfo: 
SlaveInfo): Unit = {
+LOG.info(s${executorInfo.getName} was registered on slave: 
${slaveInfo.getHostname})
+slaveId = Some(slaveInfo.getId)
+// get the configuration passed to it
+if (executorInfo.hasData) {
+  val newConfig: Configuration = 
Utils.deserialize(executorInfo.getData.toByteArray)
+  GlobalConfiguration.includeConfiguration(newConfig)
+}
+LOG.debug(Loaded configuration: {}, 
GlobalConfiguration.getConfiguration)
+  }
+
+
+  override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): 
Unit = {
+slaveId = Some(slaveInfo.getId)
+  }
+
+
+  override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
+// overlay the new config over this one
+val taskConf: Configuration = 
Utils.deserialize(task.getData.toByteArray)
+GlobalConfiguration.includeConfiguration(taskConf)
+
+// reconfigure log4j
+val logLevel = GlobalConfiguration.getString(
+  TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL)
+
+initializeLog4j(Level.toLevel(logLevel, Level.DEBUG))
+
+// get streaming mode
+val streamingMode = getStreamingMode()
+
+// create the thread
+val t = createThread(driver, 

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37184472
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala 
---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.executor
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.log4j.{ConsoleAppender, Level, Logger = ApacheLogger, 
PatternLayout}
+import org.apache.mesos.{Executor, ExecutorDriver}
+import org.apache.mesos.Protos._
+
+trait FlinkExecutor extends Executor {
+  // logger to use
+  def LOG: org.slf4j.Logger
+
+  var currentRunningTaskId: Option[TaskID] = None
+  val TASK_MANAGER_LOGGING_LEVEL_KEY = taskmanager.logging.level
+  val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = INFO
+
+
+  // methods that defines how the task is started when a launchTask is sent
+  def startTask(streamingMode: StreamingMode): Try[Unit]
+
+  var thread: Option[Thread] = None
+  var slaveId: Option[SlaveID] = None
+
+  override def shutdown(driver: ExecutorDriver): Unit = {
+LOG.info(Killing taskManager thread)
+// kill task manager thread
+for (t - thread) {
+  t.stop()
+}
+
+// exit
+sys.exit(0)
+  }
+
+  override def disconnected(driver: ExecutorDriver): Unit = {}
+
+  override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {
+for (t - thread) {
+  LOG.info(sKilling task : ${taskId.getValue})
+  thread = None
+  currentRunningTaskId = None
+
+  // stop running thread
+  t.stop()
+
+  // Send the TASK_FINISHED status
+  driver.sendStatusUpdate(TaskStatus.newBuilder()
+.setTaskId(taskId)
+.setState(TaskState.TASK_FINISHED)
+.build())
+}
+  }
+
+
+  override def error(driver: ExecutorDriver, message: String): Unit = {}
+
+  override def frameworkMessage(driver: ExecutorDriver, data: 
Array[Byte]): Unit = {}
+
+  override def registered(driver: ExecutorDriver, executorInfo: 
ExecutorInfo,
+  frameworkInfo: FrameworkInfo, slaveInfo: 
SlaveInfo): Unit = {
+LOG.info(s${executorInfo.getName} was registered on slave: 
${slaveInfo.getHostname})
+slaveId = Some(slaveInfo.getId)
+// get the configuration passed to it
+if (executorInfo.hasData) {
+  val newConfig: Configuration = 
Utils.deserialize(executorInfo.getData.toByteArray)
+  GlobalConfiguration.includeConfiguration(newConfig)
+}
+LOG.debug(Loaded configuration: {}, 
GlobalConfiguration.getConfiguration)
+  }
+
+
+  override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): 
Unit = {
+slaveId = Some(slaveInfo.getId)
+  }
+
+
+  override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
+// overlay the new config over this one
+val taskConf: Configuration = 
Utils.deserialize(task.getData.toByteArray)
+GlobalConfiguration.includeConfiguration(taskConf)
+
+// reconfigure log4j
+val logLevel = GlobalConfiguration.getString(
+  TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL)
+
+initializeLog4j(Level.toLevel(logLevel, Level.DEBUG))
+
+// get streaming mode
+val streamingMode = getStreamingMode()
+
+// create the thread
+val t = createThread(driver, task.getTaskId, streamingMode)
+thread = Some(t)
+t.start()
+
+// send message
+driver.sendStatusUpdate(TaskStatus.newBuilder()
+  .setTaskId(task.getTaskId)
+  .setState(TaskState.TASK_RUNNING)
+  

[jira] [Commented] (FLINK-2521) Add automatic test name logging for tests

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699567#comment-14699567
 ] 

ASF GitHub Bot commented on FLINK-2521:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1015#issuecomment-131829785
  
Very useful change. +1 to merge.


 Add automatic test name logging for tests
 -

 Key: FLINK-2521
 URL: https://issues.apache.org/jira/browse/FLINK-2521
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor

 When running tests on travis the Flink components log to a file. This is 
 helpful in case of a failed test to retrieve the error. However, the log does 
 not contain the test name and the reason for the failure. Therefore it is 
 difficult to find the log output which corresponds to the failed test.
 It would be nice to automatically add the test case information to the log. 
 This would ease the debugging process big time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2517) Wrong KafkaSink arguments in streaming guide

2015-08-17 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-2517:
--
Assignee: Nezih Yigitbasi

 Wrong KafkaSink arguments in streaming guide
 

 Key: FLINK-2517
 URL: https://issues.apache.org/jira/browse/FLINK-2517
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Nezih Yigitbasi
Assignee: Nezih Yigitbasi
Priority: Trivial
 Fix For: 0.10


 For the {{KafkaSink}} example in the streaming guide, the doc says zookeeper 
 host/port should be specified in the constructor. But it should be the list 
 of brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2386] Rework Kafka consumer for Flink

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1028#issuecomment-131847425
  
I like this idea a lot. The backported code is not very stable anyways...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699507#comment-14699507
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37185622
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List = JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+trait SchedulerUtils {
--- End diff --

Some code in this trait have been copied from the Spark sources, right?
We should state this at least in the scaladocs of the trait.


 Integrate Flink with Apache Mesos
 -

 Key: FLINK-1984
 URL: https://issues.apache.org/jira/browse/FLINK-1984
 Project: Flink
  Issue Type: New Feature
  Components: New Components
Reporter: Robert Metzger
Priority: Minor
 Attachments: 251.patch


 There are some users asking for an integration of Flink into Mesos.
 There also is a pending pull request for adding Mesos support for Flink: 
 https://github.com/apache/flink/pull/251
 But the PR is insufficiently tested. I'll add the code of the pull request to 
 this JIRA in case somebody wants to pick it up in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37185622
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List = JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+trait SchedulerUtils {
--- End diff --

Some code in this trait have been copied from the Spark sources, right?
We should state this at least in the scaladocs of the trait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2386) Implement Kafka connector using the new Kafka Consumer API

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699605#comment-14699605
 ] 

ASF GitHub Bot commented on FLINK-2386:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1028#issuecomment-131847425
  
I like this idea a lot. The backported code is not very stable anyways...


 Implement Kafka connector using the new Kafka Consumer API
 --

 Key: FLINK-2386
 URL: https://issues.apache.org/jira/browse/FLINK-2386
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Robert Metzger
Assignee: Robert Metzger

 Once Kafka has released its new consumer API, we should provide a connector 
 for that version.
 The release will probably be called 0.9 or 0.8.3.
 The connector will be mostly compatible with Kafka 0.8.2.x, except for 
 committing offsets to the broker (the new connector expects a coordinator to 
 be available on Kafka). To work around that, we can provide a configuration 
 option to commit offsets to zookeeper (managed by flink code).
 For 0.9/0.8.3 it will be fully compatible.
 It will not be compatible with 0.8.1 because of mismatching Kafka messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2521] [tests] Adds automatic test name ...

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1015#issuecomment-131829785
  
Very useful change. +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699581#comment-14699581
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r37192039
  
--- Diff: flink-contrib/flink-operator-stats/pom.xml ---
@@ -0,0 +1,67 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;
+
+modelVersion4.0.0/modelVersion
+
+parent
+groupIdorg.apache.flink/groupId
+artifactIdflink-contrib-parent/artifactId
+version0.10-SNAPSHOT/version
+relativePath../relativePath
+/parent
+
+artifactIdflink-operator-stats/artifactId
+nameflink-operator-stats/name
+
+packagingjar/packaging
+
+dependencies
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-java/artifactId
+version${project.version}/version
+/dependency
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-core/artifactId
+version${project.version}/version
+/dependency
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-test-utils/artifactId
+version${project.version}/version
+scopetest/scope
+/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
--- End diff --

ASL 2.0 license, that's good.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >