[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...

2016-09-27 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2550
  
For "cluster-id" thing, actually there is some similar functionality in 
current Flink, configed by "high-availability.zookeeper.path.namespace". It 
will work with another config "high-availability.zookeeper.path.root" which is 
"/flink" by default. 
The real root in zk will be "/flink/some_namespace", we can change the 
namespace when we have a new cluster. Maybe we should do it explicitly in Cli 
or some shell script. Do you think that will work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528274#comment-15528274
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2550
  
For "cluster-id" thing, actually there is some similar functionality in 
current Flink, configed by "high-availability.zookeeper.path.namespace". It 
will work with another config "high-availability.zookeeper.path.root" which is 
"/flink" by default. 
The real root in zk will be "/flink/some_namespace", we can change the 
namespace when we have a new cluster. Maybe we should do it explicitly in Cli 
or some shell script. Do you think that will work?


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528245#comment-15528245
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80837572
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

I'm not sure that throw a exception by RpcMethod is a good way to do error 
handling. From the caller's side, when the rpc method returns the Future from 
gateway, caller can do error handling with handleAsync or exceptionallyAsync 
now. But the exception from user logic with mess with all the exceptions from 
rpc framework, like RpcTimeout or other exception that tells you that maybe the 
rpc system does not work well. So typically you need to try to figure out what 
went wrong by distinguishing the Exception type, which is not very elegant i 
think. 
One way we can do is we never throw exception in RpcMethod but deal with 
error in the "ErrorCode" way by return the error explicitly with return value. 
All the exception thrown when doing rpc call should due to the rpc framework. 
In this situation, returning null is indicating that something wrong with 
requesting. (If we should know more detail about error, we can rich it by 
returning message, currently null will do the work)
And in normal case like no further split, we still return a NextInputSplit 
with empty content in it. 
What do you think about all these, let me know.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80837572
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

I'm not sure that throw a exception by RpcMethod is a good way to do error 
handling. From the caller's side, when the rpc method returns the Future from 
gateway, caller can do error handling with handleAsync or exceptionallyAsync 
now. But the exception from user logic with mess with all the exceptions from 
rpc framework, like RpcTimeout or other exception that tells you that maybe the 
rpc system does not work well. So typically you need to try to figure out what 
went wrong by distinguishing the Exception type, which is not very elegant i 
think. 
One way we can do is we never throw exception in RpcMethod but deal with 
error in the "ErrorCode" way by return the error explicitly with return value. 
All the exception thrown when doing rpc call should due to the rpc framework. 
In this situation, returning null is indicating that something wrong with 
requesting. (If we should know more detail about error, we can rich it by 
returning message, currently null will do the work)
And in normal case like no further split, we still return a NextInputSplit 
with empty content in it. 
What do you think about all these, let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4670) Add watch mechanism on current RPC framework

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528226#comment-15528226
 ] 

ASF GitHub Bot commented on FLINK-4670:
---

Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2543


> Add watch mechanism on current RPC framework
> 
>
> Key: FLINK-4670
> URL: https://issues.apache.org/jira/browse/FLINK-4670
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
> Fix For: 1.2.0
>
>
> Add watch mechanism on current RPC framework so that RPC gateway could be 
> watched to make sure the rpc server is running just like previous DeathWatch 
> in akka



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2543: [FLINK-4670] [cluster management] Add watch mechan...

2016-09-27 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2543


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528211#comment-15528211
 ] 

ASF GitHub Bot commented on FLINK-4606:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2540#discussion_r80836325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
shutDown();
}
 
+   /**
+* Registers an infoMessage listener
+*
+* @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+*/
+   @RpcMethod
+   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
+   
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
+   } else {
+   Future 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
+
+   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction() {
+   @Override
+   public void 
accept(InfoMessageListenerRpcGateway gateway) {
+   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
+   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
+   }
+   }, getMainThreadExecutor());
+
+   
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction() {
+   @Override
+   public Void apply(Throwable failure) {
+   log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
+   return null;
+   }
+   }, getMainThreadExecutor());
+   }
+   }
+
+   /**
+* Unregisters an infoMessage listener
+*
+* @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
+*
+*/
+   @RpcMethod
+   public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
+   infoMessageListeners.remove(infoMessageListenerAddress);
+   }
+
+   /**
+* Shutdowns cluster
+*
+* @param finalStatus
+* @param optionalDiagnostics
+*/
+   @RpcMethod
+   public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
+   log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
+   shutDownApplication(finalStatus, optionalDiagnostics);
+   }
+
+   /**
+* This method should be called by the framework once it detects that a 
currently registered task executor has failed.
+*
+* @param resourceID Id of the worker that has failed.
+* @param message An informational message that explains why the worker 
failed.
+*/
+   public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   WorkerType worker = 
taskExecutorGateways.remove(resourceID);
+   if (worker != null) {
+   // TODO :: suggest failed task executor 
to stop itself
+   
slotManager.notifyTaskManagerFailure(resourceID);
+   }
+   }
+   });
+   }
+
+   /**
+* Gets the number of currently started TaskManagers.
+*
+* @return The number of currently started TaskManagers.
+*/
+   public int getNumberOfStartedTaskManagers() {
+   return taskExecutorGateways.size();
+   }
+
+   /**
+* Notifies the resource manager of a fatal error.
+*
+* IMPORTANT: This should not cleanly shut down this master, 
but exit it in
+* such a way that a high-availability setting would restart this or 
fail over
+* to another master.
+*/
+   public void onFatalError(final String message, final Throwable error) {
+   runAsync(new Runnable() 

[jira] [Assigned] (FLINK-4679) Add TumbleRow row-windows for streaming tables

2016-09-27 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4679:
--

Assignee: Jark Wu

> Add TumbleRow row-windows for streaming tables
> --
>
> Key: FLINK-4679
> URL: https://issues.apache.org/jira/browse/FLINK-4679
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> Add TumbleRow row-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> This task requires to implement a custom stream operator and integrate it 
> with checkpointing and timestamp / watermark logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

2016-09-27 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2540#discussion_r80836325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
shutDown();
}
 
+   /**
+* Registers an infoMessage listener
+*
+* @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+*/
+   @RpcMethod
+   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
+   
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
+   } else {
+   Future 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
+
+   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction() {
+   @Override
+   public void 
accept(InfoMessageListenerRpcGateway gateway) {
+   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
+   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
+   }
+   }, getMainThreadExecutor());
+
+   
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction() {
+   @Override
+   public Void apply(Throwable failure) {
+   log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
+   return null;
+   }
+   }, getMainThreadExecutor());
+   }
+   }
+
+   /**
+* Unregisters an infoMessage listener
+*
+* @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
+*
+*/
+   @RpcMethod
+   public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
+   infoMessageListeners.remove(infoMessageListenerAddress);
+   }
+
+   /**
+* Shutdowns cluster
+*
+* @param finalStatus
+* @param optionalDiagnostics
+*/
+   @RpcMethod
+   public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
+   log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
+   shutDownApplication(finalStatus, optionalDiagnostics);
+   }
+
+   /**
+* This method should be called by the framework once it detects that a 
currently registered task executor has failed.
+*
+* @param resourceID Id of the worker that has failed.
+* @param message An informational message that explains why the worker 
failed.
+*/
+   public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   WorkerType worker = 
taskExecutorGateways.remove(resourceID);
+   if (worker != null) {
+   // TODO :: suggest failed task executor 
to stop itself
+   
slotManager.notifyTaskManagerFailure(resourceID);
+   }
+   }
+   });
+   }
+
+   /**
+* Gets the number of currently started TaskManagers.
+*
+* @return The number of currently started TaskManagers.
+*/
+   public int getNumberOfStartedTaskManagers() {
+   return taskExecutorGateways.size();
+   }
+
+   /**
+* Notifies the resource manager of a fatal error.
+*
+* IMPORTANT: This should not cleanly shut down this master, 
but exit it in
+* such a way that a high-availability setting would restart this or 
fail over
+* to another master.
+*/
+   public void onFatalError(final String message, final Throwable error) {
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   fatalError(message, error);
+   }
+   });
+   }
+
+   // 

[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528198#comment-15528198
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80835773
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
--- End diff --

ok, will change to "debug" and leave some comments about this.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80835773
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
--- End diff --

ok, will change to "debug" and leave some comments about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528193#comment-15528193
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80835559
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
--- End diff --

yes, you're right, will change this.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528189#comment-15528189
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80835462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! 

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80835559
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
--- End diff --

yes, you're right, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80835462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+*/
+
+   if (newJobStatus == JobStatus.FINISHED) {
+   try {
+   

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80834516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+*/
+
+   if (newJobStatus == JobStatus.FINISHED) {
+   try {
+   

[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528167#comment-15528167
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80834516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! 

[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528157#comment-15528157
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2560

[FLINK-4068] [table] Move constant computations out of code-generated

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


The origin PR is #2102. This PR makes `ReduceExpressionsRules` take effect. 
This rule can reduce constant expressions and replacing them with the 
corresponding constant.

And add some tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink reduce-expression-FLINK-4068

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2560


commit 22a0f2e949e524e20151c11cc83bd6e1e4f2a1b2
Author: Jark Wu 
Date:   2016-09-28T02:31:59Z

[FLINK-4068] [table] Move constant computations out of code-generated




> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2560: [FLINK-4068] [table] Move constant computations ou...

2016-09-27 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2560

[FLINK-4068] [table] Move constant computations out of code-generated

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


The origin PR is #2102. This PR makes `ReduceExpressionsRules` take effect. 
This rule can reduce constant expressions and replacing them with the 
corresponding constant.

And add some tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink reduce-expression-FLINK-4068

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2560


commit 22a0f2e949e524e20151c11cc83bd6e1e4f2a1b2
Author: Jark Wu 
Date:   2016-09-28T02:31:59Z

[FLINK-4068] [table] Move constant computations out of code-generated




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4450) update storm version to 1.0.0

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528031#comment-15528031
 ] 

ASF GitHub Bot commented on FLINK-4450:
---

Github user liuyuzhong commented on the issue:

https://github.com/apache/flink/pull/2439
  
Jenkins run successful, but CI fail.


![image](https://cloud.githubusercontent.com/assets/12843176/18897594/2c9e40e2-855e-11e6-8d3c-0397b33d5d8f.png)



> update storm version to 1.0.0
> -
>
> Key: FLINK-4450
> URL: https://issues.apache.org/jira/browse/FLINK-4450
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: yuzhongliu
> Fix For: 2.0.0
>
>
> The storm package path was changed in new version
> storm old version package:
> backtype.storm.*
> storm new version pachage:
> org.apache.storm.*
> shall we update flink/flink-storm code to new storm version?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...

2016-09-27 Thread liuyuzhong
Github user liuyuzhong commented on the issue:

https://github.com/apache/flink/pull/2439
  
Jenkins run successful, but CI fail.


![image](https://cloud.githubusercontent.com/assets/12843176/18897594/2c9e40e2-855e-11e6-8d3c-0397b33d5d8f.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4646) Add BipartiteGraph class

2016-09-27 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-4646:
-

Assignee: Ivan Mushketyk

> Add BipartiteGraph class
> 
>
> Key: FLINK-4646
> URL: https://issues.apache.org/jira/browse/FLINK-4646
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement a class to represent a bipartite graph in Flink Gelly. Design 
> discussions can be found in the parent task.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

2016-09-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2430#discussion_r80784331
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
@@ -17,123 +17,79 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-   protected final static String TOPIC = "customPartitioningTestTopic";
-   protected final static int PARALLELISM = 1;
+   protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 
+   protected FlinkKafkaProducerBase kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
-   LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-   createTestTopic(TOPIC, PARALLELISM, 1);
-   StreamExecutionEnvironment env = createEnvironment();
-
-   createProducingTopology(env);
-   createConsumingTopology(env);
-
-   tryExecute(env, "custom partitioning test");
-   deleteTestTopic(TOPIC);
-   LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
-   }
+   DataStream dataStream = mock(DataStream.class);
+   KafkaTableSink kafkaTableSink = createTableSink();
+   kafkaTableSink.emitDataStream(dataStream);
 
-   private StreamExecutionEnvironment createEnvironment() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-   env.setRestartStrategy(RestartStrategies.noRestart());
-   env.getConfig().disableSysoutLogging();
-   return env;
+   verify(dataStream).addSink(kafkaProducer);
}
 
-   private void createProducingTopology(StreamExecutionEnvironment env) {
-   DataStream stream = env.addSource(new 
SourceFunction() {
-   private boolean running = true;
-
-   @Override
-   public void run(SourceContext ctx) throws 
Exception {
-   long cnt = 0;
-   while (running) {
-   Row row = new Row(2);
-   row.setField(0, cnt);
-   row.setField(1, "kafka-" + cnt);
-   ctx.collect(row);
-   cnt++;
-   }
-   }
-
-   @Override
-   public void cancel() {
-   running = false;
-   }
-   })
-   .setParallelism(1);
-
-   KafkaTableSink kafkaTableSinkBase = createTableSink();
-
-   

[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527303#comment-15527303
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2430#discussion_r80783113
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
@@ -17,123 +17,79 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-   protected final static String TOPIC = "customPartitioningTestTopic";
-   protected final static int PARALLELISM = 1;
+   protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 
+   protected FlinkKafkaProducerBase kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
-   LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-   createTestTopic(TOPIC, PARALLELISM, 1);
-   StreamExecutionEnvironment env = createEnvironment();
-
-   createProducingTopology(env);
-   createConsumingTopology(env);
-
-   tryExecute(env, "custom partitioning test");
-   deleteTestTopic(TOPIC);
-   LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
-   }
+   DataStream dataStream = mock(DataStream.class);
+   KafkaTableSink kafkaTableSink = createTableSink();
+   kafkaTableSink.emitDataStream(dataStream);
 
-   private StreamExecutionEnvironment createEnvironment() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-   env.setRestartStrategy(RestartStrategies.noRestart());
-   env.getConfig().disableSysoutLogging();
-   return env;
+   verify(dataStream).addSink(kafkaProducer);
}
 
-   private void createProducingTopology(StreamExecutionEnvironment env) {
-   DataStream stream = env.addSource(new 
SourceFunction() {
-   private boolean running = true;
-
-   @Override
-   public void run(SourceContext ctx) throws 
Exception {
-   long cnt = 0;
-   while (running) {
-   Row row = new Row(2);
-   row.setField(0, cnt);
-   row.setField(1, "kafka-" + cnt);
-   ctx.collect(row);
-   cnt++;
-   }
-   }
-
-   @Override
-   public void cancel() {
-  

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

2016-09-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2430#discussion_r80783113
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
@@ -17,123 +17,79 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-   protected final static String TOPIC = "customPartitioningTestTopic";
-   protected final static int PARALLELISM = 1;
+   protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 
+   protected FlinkKafkaProducerBase kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
-   LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-   createTestTopic(TOPIC, PARALLELISM, 1);
-   StreamExecutionEnvironment env = createEnvironment();
-
-   createProducingTopology(env);
-   createConsumingTopology(env);
-
-   tryExecute(env, "custom partitioning test");
-   deleteTestTopic(TOPIC);
-   LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
-   }
+   DataStream dataStream = mock(DataStream.class);
+   KafkaTableSink kafkaTableSink = createTableSink();
+   kafkaTableSink.emitDataStream(dataStream);
 
-   private StreamExecutionEnvironment createEnvironment() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-   env.setRestartStrategy(RestartStrategies.noRestart());
-   env.getConfig().disableSysoutLogging();
-   return env;
+   verify(dataStream).addSink(kafkaProducer);
}
 
-   private void createProducingTopology(StreamExecutionEnvironment env) {
-   DataStream stream = env.addSource(new 
SourceFunction() {
-   private boolean running = true;
-
-   @Override
-   public void run(SourceContext ctx) throws 
Exception {
-   long cnt = 0;
-   while (running) {
-   Row row = new Row(2);
-   row.setField(0, cnt);
-   row.setField(1, "kafka-" + cnt);
-   ctx.collect(row);
-   cnt++;
-   }
-   }
-
-   @Override
-   public void cancel() {
-   running = false;
-   }
-   })
-   .setParallelism(1);
-
-   KafkaTableSink kafkaTableSinkBase = createTableSink();
-
-   

[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527304#comment-15527304
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2430#discussion_r80783750
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
@@ -17,123 +17,79 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-   protected final static String TOPIC = "customPartitioningTestTopic";
-   protected final static int PARALLELISM = 1;
+   protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 
+   protected FlinkKafkaProducerBase kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
-   LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-   createTestTopic(TOPIC, PARALLELISM, 1);
-   StreamExecutionEnvironment env = createEnvironment();
-
-   createProducingTopology(env);
-   createConsumingTopology(env);
-
-   tryExecute(env, "custom partitioning test");
-   deleteTestTopic(TOPIC);
-   LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
-   }
+   DataStream dataStream = mock(DataStream.class);
+   KafkaTableSink kafkaTableSink = createTableSink();
+   kafkaTableSink.emitDataStream(dataStream);
 
-   private StreamExecutionEnvironment createEnvironment() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-   env.setRestartStrategy(RestartStrategies.noRestart());
-   env.getConfig().disableSysoutLogging();
-   return env;
+   verify(dataStream).addSink(kafkaProducer);
}
 
-   private void createProducingTopology(StreamExecutionEnvironment env) {
-   DataStream stream = env.addSource(new 
SourceFunction() {
-   private boolean running = true;
-
-   @Override
-   public void run(SourceContext ctx) throws 
Exception {
-   long cnt = 0;
-   while (running) {
-   Row row = new Row(2);
-   row.setField(0, cnt);
-   row.setField(1, "kafka-" + cnt);
-   ctx.collect(row);
-   cnt++;
-   }
-   }
-
-   @Override
-   public void cancel() {
-  

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

2016-09-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2430#discussion_r80783750
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
@@ -17,123 +17,79 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-   protected final static String TOPIC = "customPartitioningTestTopic";
-   protected final static int PARALLELISM = 1;
+   protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 
+   protected FlinkKafkaProducerBase kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
-   LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-   createTestTopic(TOPIC, PARALLELISM, 1);
-   StreamExecutionEnvironment env = createEnvironment();
-
-   createProducingTopology(env);
-   createConsumingTopology(env);
-
-   tryExecute(env, "custom partitioning test");
-   deleteTestTopic(TOPIC);
-   LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
-   }
+   DataStream dataStream = mock(DataStream.class);
+   KafkaTableSink kafkaTableSink = createTableSink();
+   kafkaTableSink.emitDataStream(dataStream);
 
-   private StreamExecutionEnvironment createEnvironment() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-   env.setRestartStrategy(RestartStrategies.noRestart());
-   env.getConfig().disableSysoutLogging();
-   return env;
+   verify(dataStream).addSink(kafkaProducer);
}
 
-   private void createProducingTopology(StreamExecutionEnvironment env) {
-   DataStream stream = env.addSource(new 
SourceFunction() {
-   private boolean running = true;
-
-   @Override
-   public void run(SourceContext ctx) throws 
Exception {
-   long cnt = 0;
-   while (running) {
-   Row row = new Row(2);
-   row.setField(0, cnt);
-   row.setField(1, "kafka-" + cnt);
-   ctx.collect(row);
-   cnt++;
-   }
-   }
-
-   @Override
-   public void cancel() {
-   running = false;
-   }
-   })
-   .setParallelism(1);
-
-   KafkaTableSink kafkaTableSinkBase = createTableSink();
-
-   

[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527305#comment-15527305
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2430#discussion_r80784331
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
@@ -17,123 +17,79 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-   protected final static String TOPIC = "customPartitioningTestTopic";
-   protected final static int PARALLELISM = 1;
+   protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 
+   protected FlinkKafkaProducerBase kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
-   LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-   createTestTopic(TOPIC, PARALLELISM, 1);
-   StreamExecutionEnvironment env = createEnvironment();
-
-   createProducingTopology(env);
-   createConsumingTopology(env);
-
-   tryExecute(env, "custom partitioning test");
-   deleteTestTopic(TOPIC);
-   LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
-   }
+   DataStream dataStream = mock(DataStream.class);
+   KafkaTableSink kafkaTableSink = createTableSink();
+   kafkaTableSink.emitDataStream(dataStream);
 
-   private StreamExecutionEnvironment createEnvironment() {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-   env.setRestartStrategy(RestartStrategies.noRestart());
-   env.getConfig().disableSysoutLogging();
-   return env;
+   verify(dataStream).addSink(kafkaProducer);
}
 
-   private void createProducingTopology(StreamExecutionEnvironment env) {
-   DataStream stream = env.addSource(new 
SourceFunction() {
-   private boolean running = true;
-
-   @Override
-   public void run(SourceContext ctx) throws 
Exception {
-   long cnt = 0;
-   while (running) {
-   Row row = new Row(2);
-   row.setField(0, cnt);
-   row.setField(1, "kafka-" + cnt);
-   ctx.collect(row);
-   cnt++;
-   }
-   }
-
-   @Override
-   public void cancel() {
-  

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-27 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2559

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously

The offset commit calls to Kafka may occasionally take very long. In that 
case, the notifyCheckpointComplete() method blocks for long and the 
KafkaConsumer cannot make progress and cannot perform checkpoints.

This pull request changes the offset committing to use Kafka's 
`commitAsync()` method.
It also makes sure that no more than one commit is concurrently in 
progress, to that commit requests do not pile up.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2559


commit eafba8600863c18e09397366485bcfc6ff44960f
Author: Stephan Ewen 
Date:   2016-09-27T18:59:35Z

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527146#comment-15527146
 ] 

ASF GitHub Bot commented on FLINK-4702:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2559

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously

The offset commit calls to Kafka may occasionally take very long. In that 
case, the notifyCheckpointComplete() method blocks for long and the 
KafkaConsumer cannot make progress and cannot perform checkpoints.

This pull request changes the offset committing to use Kafka's 
`commitAsync()` method.
It also makes sure that no more than one commit is concurrently in 
progress, to that commit requests do not pile up.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2559


commit eafba8600863c18e09397366485bcfc6ff44960f
Author: Stephan Ewen 
Date:   2016-09-27T18:59:35Z

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously




> Kafka consumer must commit offsets asynchronously
> -
>
> Key: FLINK-4702
> URL: https://issues.apache.org/jira/browse/FLINK-4702
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4702) Kafka consumer must commit offsets asynchronously

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4702:
---

 Summary: Kafka consumer must commit offsets asynchronously
 Key: FLINK-4702
 URL: https://issues.apache.org/jira/browse/FLINK-4702
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 1.2.0, 1.1.3


The offset commit calls to Kafka may occasionally take very long.
In that case, the {{notifyCheckpointComplete()}} method blocks for long and the 
KafkaConsumer cannot make progress and cannot perform checkpoints.

Kafka 0.9+ have methods to commit asynchronously.
We should use those and make sure no more than one commit is concurrently in 
progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4675) Remove Parameter from WindowAssigner.getDefaultTrigger()

2016-09-27 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527122#comment-15527122
 ] 

Stephan Ewen commented on FLINK-4675:
-

It would be okay to break it now (from what we defined as stability guarantees) 
- but tough call still.

Do we have a hunch how many people write their own window assigners? If only 
few people write custom window assigners, it is probably fine.

> Remove Parameter from WindowAssigner.getDefaultTrigger()
> 
>
> Key: FLINK-4675
> URL: https://issues.apache.org/jira/browse/FLINK-4675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
> Fix For: 2.0.0
>
>
> For legacy reasons the method has {{StreamExecutionEnvironment}} as a 
> parameter. This is not needed anymore.
> [~StephanEwen] do you think we should break this now? {{WindowAssigner}} is 
> {{PublicEvolving}} but I wanted to play it conservative for now.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests

2016-09-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527013#comment-15527013
 ] 

Fabian Hueske commented on FLINK-4699:
--

There is a pull request to convert the KafkaJSONSinkITCases into unit tests: 
https://github.com/apache/flink/pull/2430

> Convert Kafka TableSource/TableSink tests to unit tests
> ---
>
> Key: FLINK-4699
> URL: https://issues.apache.org/jira/browse/FLINK-4699
> Project: Flink
>  Issue Type: Test
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> The Kafka tests are extremely heavy and that the Table Sources and Sinks are 
> only thin wrappers on top of the Kafka Sources / Sinks. That should not need 
> to bring up Kafka clusters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526962#comment-15526962
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2550
  
Can we try and unify the structure / paths under which all this information 
is stored in ZooKeeper?

```
/flink
 +/cluster_id_1/resource_manager_lock
 ||
 |+/job-id-1/job_manager_lock
 || /checkpoints/latest
 || /latest-1
 || /latest-2
 ||
 |+/job-id-2/job_manager_lock
 |  
 +/cluster_id_2/resource_manager_lock
  |
  +/job-id-1/job_manager_lock
   |/checkpoints/latest
   |/latest-1
   |/persisted_job_graph
```
The "cluster-id" should be a generated UUID in the case of YARN/Mesos, and 
should be a config value in the standalone case. In Yarn / Mesos, the UUID 
should be passed via an environment variable to the Java processes with the 
entry points for TaskManager / JobManager / ResourceManager.

In the Constructor, the ZooKeeper HA Services should get the "cluster-id".


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2550
  
Can we try and unify the structure / paths under which all this information 
is stored in ZooKeeper?

```
/flink
 +/cluster_id_1/resource_manager_lock
 ||
 |+/job-id-1/job_manager_lock
 || /checkpoints/latest
 || /latest-1
 || /latest-2
 ||
 |+/job-id-2/job_manager_lock
 |  
 +/cluster_id_2/resource_manager_lock
  |
  +/job-id-1/job_manager_lock
   |/checkpoints/latest
   |/latest-1
   |/persisted_job_graph
```
The "cluster-id" should be a generated UUID in the case of YARN/Mesos, and 
should be a config value in the standalone case. In Yarn / Mesos, the UUID 
should be passed via an environment variable to the Java processes with the 
entry points for TaskManager / JobManager / ResourceManager.

In the Constructor, the ZooKeeper HA Services should get the "cluster-id".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4701) Unprotected access to cancelables in StreamTask

2016-09-27 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4701:
-

 Summary: Unprotected access to cancelables in StreamTask
 Key: FLINK-4701
 URL: https://issues.apache.org/jira/browse/FLINK-4701
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


In performCheckpoint():
{code}
AsyncCheckpointRunnable asyncCheckpointRunnable 
= new AsyncCheckpointRunnable(
"checkpoint-" + checkpointId + 
"-" + timestamp,
this,
cancelables,
chainedStateHandles,
keyGroupsStateHandleFuture,
checkpointId,
bytesBufferedAlignment,
alignmentDurationNanos,
syncDurationMillis,
endOfSyncPart);

synchronized (cancelables) {

cancelables.add(asyncCheckpointRunnable);
}
{code}
Construction of AsyncCheckpointRunnable should be put under the synchronized 
block of cancelables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4543) Race Deadlock in SpilledSubpartitionViewTest

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4543.
---

> Race Deadlock in SpilledSubpartitionViewTest
> 
>
> Key: FLINK-4543
> URL: https://issues.apache.org/jira/browse/FLINK-4543
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The test deadlocked (Java level deadlock) with the following stack traces:
> {code}
> Found one Java-level deadlock:
> =
> "pool-1-thread-2":
>   waiting to lock monitor 0x7fec2c006168 (object 0xef661c20, a 
> java.lang.Object),
>   which is held by "IOManager reader thread #1"
> "IOManager reader thread #1":
>   waiting to lock monitor 0x7fec2c005ea8 (object 0xef62c8a8, a 
> java.lang.Object),
>   which is held by "pool-1-thread-2"
> Java stack information for the threads listed above:
> ===
> "pool-1-thread-2":
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:309)
> - waiting to lock <0xef661c20> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:261)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:380)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:366)
> at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135)
> - locked <0xef62c8a8> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118)
> - locked <0xef9597c0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.util.TestConsumerCallback$RecyclingCallback.onBuffer(TestConsumerCallback.java:72)
> at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:87)
> at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "IOManager reader thread #1":
> at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:126)
> - waiting to lock <0xef62c8a8> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118)
> - locked <0xefa016f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:275)
> - locked <0xef661c20> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:343)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:333)
> at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
> at 
> org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:435)
> at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:408)
> Found 1 deadlock.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4543) Race Deadlock in SpilledSubpartitionViewTest

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4543.
-
Resolution: Fixed

Fixed via 90902914ac4b11f9554b67ad49e0d697a0d02f93

> Race Deadlock in SpilledSubpartitionViewTest
> 
>
> Key: FLINK-4543
> URL: https://issues.apache.org/jira/browse/FLINK-4543
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The test deadlocked (Java level deadlock) with the following stack traces:
> {code}
> Found one Java-level deadlock:
> =
> "pool-1-thread-2":
>   waiting to lock monitor 0x7fec2c006168 (object 0xef661c20, a 
> java.lang.Object),
>   which is held by "IOManager reader thread #1"
> "IOManager reader thread #1":
>   waiting to lock monitor 0x7fec2c005ea8 (object 0xef62c8a8, a 
> java.lang.Object),
>   which is held by "pool-1-thread-2"
> Java stack information for the threads listed above:
> ===
> "pool-1-thread-2":
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:309)
> - waiting to lock <0xef661c20> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:261)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:380)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:366)
> at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135)
> - locked <0xef62c8a8> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118)
> - locked <0xef9597c0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.util.TestConsumerCallback$RecyclingCallback.onBuffer(TestConsumerCallback.java:72)
> at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:87)
> at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "IOManager reader thread #1":
> at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:126)
> - waiting to lock <0xef62c8a8> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118)
> - locked <0xefa016f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:275)
> - locked <0xef661c20> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:343)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:333)
> at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
> at 
> org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:435)
> at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:408)
> Found 1 deadlock.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4543) Race Deadlock in SpilledSubpartitionViewTest

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526905#comment-15526905
 ] 

ASF GitHub Bot commented on FLINK-4543:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2444


> Race Deadlock in SpilledSubpartitionViewTest
> 
>
> Key: FLINK-4543
> URL: https://issues.apache.org/jira/browse/FLINK-4543
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The test deadlocked (Java level deadlock) with the following stack traces:
> {code}
> Found one Java-level deadlock:
> =
> "pool-1-thread-2":
>   waiting to lock monitor 0x7fec2c006168 (object 0xef661c20, a 
> java.lang.Object),
>   which is held by "IOManager reader thread #1"
> "IOManager reader thread #1":
>   waiting to lock monitor 0x7fec2c005ea8 (object 0xef62c8a8, a 
> java.lang.Object),
>   which is held by "pool-1-thread-2"
> Java stack information for the threads listed above:
> ===
> "pool-1-thread-2":
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:309)
> - waiting to lock <0xef661c20> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:261)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:380)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:366)
> at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135)
> - locked <0xef62c8a8> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118)
> - locked <0xef9597c0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.util.TestConsumerCallback$RecyclingCallback.onBuffer(TestConsumerCallback.java:72)
> at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:87)
> at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "IOManager reader thread #1":
> at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:126)
> - waiting to lock <0xef62c8a8> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118)
> - locked <0xefa016f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:275)
> - locked <0xef661c20> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:343)
> at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:333)
> at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
> at 
> org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:435)
> at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:408)
> Found 1 deadlock.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2444: [FLINK-4543] [network] Fix potential deadlock in S...

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2444


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4560) enforcer java version as 1.7

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4560.
---

> enforcer java version as 1.7
> 
>
> Key: FLINK-4560
> URL: https://issues.apache.org/jira/browse/FLINK-4560
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
> Fix For: 1.2.0
>
>
> 1. maven-enforcer-plugin add java version enforce
> 2. maven-enforcer-plugin version upgrade to 1.4.1
> explicit require java version



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4560) enforcer java version as 1.7

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4560.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via b928935b8c5be02b23dd2cb87144ae1ea001278c

Thank you for the contribution!

> enforcer java version as 1.7
> 
>
> Key: FLINK-4560
> URL: https://issues.apache.org/jira/browse/FLINK-4560
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
> Fix For: 1.2.0
>
>
> 1. maven-enforcer-plugin add java version enforce
> 2. maven-enforcer-plugin version upgrade to 1.4.1
> explicit require java version



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4560) enforcer java version as 1.7

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526904#comment-15526904
 ] 

ASF GitHub Bot commented on FLINK-4560:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2458


> enforcer java version as 1.7
> 
>
> Key: FLINK-4560
> URL: https://issues.apache.org/jira/browse/FLINK-4560
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
> Fix For: 1.2.0
>
>
> 1. maven-enforcer-plugin add java version enforce
> 2. maven-enforcer-plugin version upgrade to 1.4.1
> explicit require java version



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2458: [FLINK-4560] enforcer java version as 1.7

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2458


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4695) Separate configuration parsing from MetricRegistry

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526820#comment-15526820
 ] 

ASF GitHub Bot commented on FLINK-4695:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2555


> Separate configuration parsing from MetricRegistry
> --
>
> Key: FLINK-4695
> URL: https://issues.apache.org/jira/browse/FLINK-4695
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to decouple the {{MetricRegistry}} object instantiation from the 
> global configuration, we could introduce a {{MetricRegistryConfiguration}} 
> object which encapsulates all necessary information for the 
> {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static 
> method to be generated from a {{Configuration}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

2016-09-27 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2510
  
Sounds good @StephanEwen . Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2555: [FLINK-4695] Introduce MetricRegistryConfiguration...

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2555


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4695) Separate configuration parsing from MetricRegistry

2016-09-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4695.

Resolution: Fixed

Added via f1b5b35f595e7ae53001a4c46edbf0c9b78ee376

> Separate configuration parsing from MetricRegistry
> --
>
> Key: FLINK-4695
> URL: https://issues.apache.org/jira/browse/FLINK-4695
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to decouple the {{MetricRegistry}} object instantiation from the 
> global configuration, we could introduce a {{MetricRegistryConfiguration}} 
> object which encapsulates all necessary information for the 
> {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static 
> method to be generated from a {{Configuration}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4695) Separate configuration parsing from MetricRegistry

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526805#comment-15526805
 ] 

ASF GitHub Bot commented on FLINK-4695:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2555
  
Thanks for the review @StephanEwen. Build passed locally. Will merge the PR.


> Separate configuration parsing from MetricRegistry
> --
>
> Key: FLINK-4695
> URL: https://issues.apache.org/jira/browse/FLINK-4695
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to decouple the {{MetricRegistry}} object instantiation from the 
> global configuration, we could introduce a {{MetricRegistryConfiguration}} 
> object which encapsulates all necessary information for the 
> {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static 
> method to be generated from a {{Configuration}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2555: [FLINK-4695] Introduce MetricRegistryConfiguration to enc...

2016-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2555
  
Thanks for the review @StephanEwen. Build passed locally. Will merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526787#comment-15526787
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2510
  
Sounds good @StephanEwen . Thank you.


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526751#comment-15526751
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80740850
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! 

[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526750#comment-15526750
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80734121
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
--- End diff --

Let's let this throw an exception - after all, the caller receiver should 
react to the fact that the JobManager is not aware of its execution any more.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526753#comment-15526753
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80733924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
--- End diff --

The log could be on "debug" level, as this situation can occur when the 
JobManager failed tasks and unregistered them, but the cancel() messages have 
not reached the TaskManager.

Since this is a valid situation, it should not cause a "WARN" message. In 
general, all situations that are valid race conditions and that need not 
indicate a corrupted state or lead to a failure / recovery should probably not 
have a "WARN" level.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526754#comment-15526754
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80733402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

Let's let this method throw an exception. Then the calling TaskManager can 
see the difference between `null` (= no further split) and `Exception` (= 
something went wrong / is inconsistent).


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526752#comment-15526752
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80742192
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
--- End diff --

Is there a way we can get this method out of the public interface? It could 
do state changes in `callAsync()`. That way, it would not be callable via RPC, 
because it seems like this is no method anyone should invoke remotely.


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526749#comment-15526749
 ] 

ASF GitHub Bot commented on FLINK-4657:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80740727
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! 

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80734121
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
--- End diff --

Let's let this throw an exception - after all, the caller receiver should 
react to the fact that the JobManager is not aware of its execution any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80733402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
--- End diff --

Let's let this method throw an exception. Then the calling TaskManager can 
see the difference between `null` (= no further split) and `Exception` (= 
something went wrong / is inconsistent).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80740850
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+*/
+
+   if (newJobStatus == JobStatus.FINISHED) {
+   try {
+ 

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80740727
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+   final JobID jobID = executionGraph.getJobID();
+   final String jobName = executionGraph.getJobName();
+   log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+   if (newJobStatus.isGloballyTerminalState()) {
+   // TODO set job end time in JobInfo
+
+   /*
+ TODO
+ if (jobInfo.sessionAlive) {
+jobInfo.setLastActive()
+val lastActivity = jobInfo.lastActive
+
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+  // remove only if no activity occurred in the meantime
+  if (lastActivity == jobInfo.lastActive) {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+}(context.dispatcher)
+  } else {
+self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+  }
+*/
+
+   if (newJobStatus == JobStatus.FINISHED) {
+   try {
+ 

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80733924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
--- End diff --

The log could be on "debug" level, as this situation can occur when the 
JobManager failed tasks and unregistered them, but the cancel() messages have 
not reached the TaskManager.

Since this is a valid situation, it should not cause a "WARN" message. In 
general, all situations that are valid race conditions and that need not 
indicate a corrupted state or lead to a failure / recovery should probably not 
have a "WARN" level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2550#discussion_r80742192
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
//TODO:: register at the RM
}
 
+   @RpcMethod
+   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
+   final byte[] serializedInputSplit;
+
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+   if (execution == null) {
+   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
+   return null;
+   } else {
+   final Slot slot = execution.getAssignedResource();
+   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+
+   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+   if (vertex != null) {
+   final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+   if (splitAssigner != null) {
+   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+   log.debug("Send next input split {}.", 
nextInputSplit);
+   try {
+   serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+   } catch (Exception ex) {
+   log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
+   vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
+   
nextInputSplit.getClass() + ".", ex));
+   return null;
+   }
+   } else {
+   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   } else {
+   log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
+   return null;
+   }
+   }
+   return new NextInputSplit(serializedInputSplit);
+   }
+
+   @RpcMethod
+   public PartitionState requestPartitionState(
+   final ResultPartitionID partitionId,
+   final ExecutionAttemptID taskExecutionId,
+   final IntermediateDataSetID taskResultId)
+   {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+   final ExecutionState state = execution != null ? 
execution.getState() : null;
+   return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
+   }
+
+   @RpcMethod
+   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
--- End diff --

Is there a way we can get this method out of the public interface? It could 
do state changes in `callAsync()`. That way, it would not be callable via RPC, 
because it seems like this is no method anyone should invoke remotely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2552: [FLINK-4690] Replace SlotAllocationFuture with fli...

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2552


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4690) Replace SlotAllocationFuture with flink's own future

2016-09-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4690.
--
Resolution: Fixed

Added via 84672c22f8088a70caf35b54d74eee458bf600dd

> Replace SlotAllocationFuture with flink's own future
> 
>
> Key: FLINK-4690
> URL: https://issues.apache.org/jira/browse/FLINK-4690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4361) Introduce Flink's own future abstraction

2016-09-27 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526671#comment-15526671
 ] 

Till Rohrmann commented on FLINK-4361:
--

Added to the master via f8138f4b74332ecb4ef0d28a09e8549708118ca6

> Introduce Flink's own future abstraction
> 
>
> Key: FLINK-4361
> URL: https://issues.apache.org/jira/browse/FLINK-4361
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>
> In order to keep the abstraction Scala Independent, we should not rely on 
> Scala Futures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4361) Introduce Flink's own future abstraction

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526669#comment-15526669
 ] 

ASF GitHub Bot commented on FLINK-4361:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2554


> Introduce Flink's own future abstraction
> 
>
> Key: FLINK-4361
> URL: https://issues.apache.org/jira/browse/FLINK-4361
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>
> In order to keep the abstraction Scala Independent, we should not rely on 
> Scala Futures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4690) Replace SlotAllocationFuture with flink's own future

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526668#comment-15526668
 ] 

ASF GitHub Bot commented on FLINK-4690:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2552


> Replace SlotAllocationFuture with flink's own future
> 
>
> Key: FLINK-4690
> URL: https://issues.apache.org/jira/browse/FLINK-4690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2554: [FLINK-4361] Introduce Flink's own future abstract...

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2554


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4690) Replace SlotAllocationFuture with flink's own future

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1552#comment-1552
 ] 

ASF GitHub Bot commented on FLINK-4690:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2552
  
I've merged it into the master branch. You can close the PR.


> Replace SlotAllocationFuture with flink's own future
> 
>
> Key: FLINK-4690
> URL: https://issues.apache.org/jira/browse/FLINK-4690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2552: [FLINK-4690] Replace SlotAllocationFuture with flink's ow...

2016-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2552
  
I've merged it into the master branch. You can close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526621#comment-15526621
 ] 

ASF GitHub Bot commented on FLINK-4669:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2541#discussion_r80730795
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -124,7 +125,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Gets the checkpoint config, which defines values like checkpoint 
interval, delay between
* checkpoints, etc.
*/
-  def getCheckpointConfig = javaEnv.getCheckpointConfig()
--- End diff --

Calling Java methods with parenthesis or without actually somewhat subject 
to debate.

For Scala methods, the writer decides whether it is a method with or 
without parenthesis.
For Java, the caller can only "guess" what the method does (is returning a 
mutable field considered a side effect?), so in most places we call Java 
methods as Java methods.


> scala api createLocalEnvironment() function add default Configuration 
> parameter
> ---
>
> Key: FLINK-4669
> URL: https://issues.apache.org/jira/browse/FLINK-4669
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: shijinkui
>
> scala program can't direct use createLocalEnvironment with custom Configure 
> object.
> such as I want to start web server in local mode, I will do such as:
> ```
> // set up execution environment
> val conf = new Configuration
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
> ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
> val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(
>   
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2,
>  conf)
> )
> ```
> so we need createLocalEnvironment function have a config parameter 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2541: [FLINK-4669] scala api createLocalEnvironment() fu...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2541#discussion_r80730795
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -124,7 +125,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Gets the checkpoint config, which defines values like checkpoint 
interval, delay between
* checkpoints, etc.
*/
-  def getCheckpointConfig = javaEnv.getCheckpointConfig()
--- End diff --

Calling Java methods with parenthesis or without actually somewhat subject 
to debate.

For Scala methods, the writer decides whether it is a method with or 
without parenthesis.
For Java, the caller can only "guess" what the method does (is returning a 
mutable field considered a side effect?), so in most places we call Java 
methods as Java methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4583) NullPointerException in CliFrontend

2016-09-27 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4583.
-
Resolution: Duplicate

> NullPointerException in CliFrontend
> ---
>
> Key: FLINK-4583
> URL: https://issues.apache.org/jira/browse/FLINK-4583
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> If no Flink program is executed the following exception message is printed. 
> This can happen when a driver prints usage due to insufficient or improper 
> configuration.
> {noformat}
>  The program finished with the following exception:
> java.lang.NullPointerException
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:781)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1002)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1045)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4700) Harden the TimeProvider test

2016-09-27 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-4700:
-

 Summary: Harden the TimeProvider test
 Key: FLINK-4700
 URL: https://issues.apache.org/jira/browse/FLINK-4700
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


Currently the TimeProvider test fails due to a race condition. This task aims 
at fixing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4677) Jars with no job executions produces NullPointerException in ClusterClient

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526575#comment-15526575
 ] 

ASF GitHub Bot commented on FLINK-4677:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2548
  
Do we have a standard way for user programs to print usage and exit without 
triggering this message? This is certainly preferable to the 
`NullPointerException`.


> Jars with no job executions produces NullPointerException in ClusterClient
> --
>
> Key: FLINK-4677
> URL: https://issues.apache.org/jira/browse/FLINK-4677
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0, 1.1.3
>
>
> When the user jar contains no job executions, the command-line client 
> displays a NullPointerException. This is not a big issue but should be 
> changed to something more descriptive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2548: [FLINK-4677] fail if user jar contains no executions

2016-09-27 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2548
  
Do we have a standard way for user programs to print usage and exit without 
triggering this message? This is certainly preferable to the 
`NullPointerException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests

2016-09-27 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4699:
---

 Summary: Convert Kafka TableSource/TableSink tests to unit tests
 Key: FLINK-4699
 URL: https://issues.apache.org/jira/browse/FLINK-4699
 Project: Flink
  Issue Type: Test
  Components: Table API & SQL
Reporter: Timo Walther


The Kafka tests are extremely heavy and that the Table Sources and Sinks are 
only thin wrappers on top of the Kafka Sources / Sinks. That should not need to 
bring up Kafka clusters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2548: [FLINK-4677] fail if user jar contains no executions

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2548
  
Looks good to me, +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4677) Jars with no job executions produces NullPointerException in ClusterClient

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526523#comment-15526523
 ] 

ASF GitHub Bot commented on FLINK-4677:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2548
  
Looks good to me, +1 to merge


> Jars with no job executions produces NullPointerException in ClusterClient
> --
>
> Key: FLINK-4677
> URL: https://issues.apache.org/jira/browse/FLINK-4677
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0, 1.1.3
>
>
> When the user jar contains no job executions, the command-line client 
> displays a NullPointerException. This is not a big issue but should be 
> changed to something more descriptive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4698) Visualize additional checkpoint information

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4698:
---

 Summary: Visualize additional checkpoint information
 Key: FLINK-4698
 URL: https://issues.apache.org/jira/browse/FLINK-4698
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Reporter: Stephan Ewen
 Fix For: 1.2.0


Display the additional information gathered in the {{CheckpointStatsTracker}} 
in the "Checkpoint" tab.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4685) Gather operator checkpoint durations data sizes from the runtime

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4685.
---

> Gather operator checkpoint durations data sizes from the runtime
> 
>
> Key: FLINK-4685
> URL: https://issues.apache.org/jira/browse/FLINK-4685
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4696) Limit the number of Akka Dispatcher Threads in LocalMiniCluster

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4696.
---

> Limit the number of Akka Dispatcher Threads in LocalMiniCluster
> ---
>
> Key: FLINK-4696
> URL: https://issues.apache.org/jira/browse/FLINK-4696
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> By default, akka spawns 2x or 3x the number of cores in threads.
> For the LocalFlinkMiniCluster, running on Travis (often 64 cores), with 
> separate actor systems for jobmanager and multiple taskmanagers, this 
> frequetly means >600 akka threads. Flink uses about 4 actors.
> This simply eats unnecessary resources. I suggest to have at most 4 threads 
> per actor system in test setups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4685) Gather operator checkpoint durations data sizes from the runtime

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4685.
-
Resolution: Implemented

Implemented in b1642e32c2f69c60c2b212260c3479feb66a9165

> Gather operator checkpoint durations data sizes from the runtime
> 
>
> Key: FLINK-4685
> URL: https://issues.apache.org/jira/browse/FLINK-4685
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4697) Gather more detailed checkpoint stats in CheckpointStatsTracker

2016-09-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4697:
---

 Summary: Gather more detailed checkpoint stats in 
CheckpointStatsTracker
 Key: FLINK-4697
 URL: https://issues.apache.org/jira/browse/FLINK-4697
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
 Fix For: 1.2.0


The additional information attached to the {{AcknowledgeCheckpoint}} method 
must be gathered in the {{CheckpointStatsTracker}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4696) Limit the number of Akka Dispatcher Threads in LocalMiniCluster

2016-09-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4696.
-
Resolution: Fixed

Fixed via 6ea9284d29ec79576f073441a5de681019720ab0

> Limit the number of Akka Dispatcher Threads in LocalMiniCluster
> ---
>
> Key: FLINK-4696
> URL: https://issues.apache.org/jira/browse/FLINK-4696
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> By default, akka spawns 2x or 3x the number of cores in threads.
> For the LocalFlinkMiniCluster, running on Travis (often 64 cores), with 
> separate actor systems for jobmanager and multiple taskmanagers, this 
> frequetly means >600 akka threads. Flink uses about 4 actors.
> This simply eats unnecessary resources. I suggest to have at most 4 threads 
> per actor system in test setups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4630) add netty tcp/restful pushed source support

2016-09-27 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526424#comment-15526424
 ] 

shijinkui commented on FLINK-4630:
--

Some business system such as risk online management. They need the response 
cost to be minimum, at least below 50ms. If we business system send message to 
kafka or other message queue, the delay will growing up. Message send directly 
from distributed business system to Flink Source, that is end-to-end.

I only finished netty tcp source, example:
https://github.com/HuaweiBigData/flink-netty-source/blob/master/src/test/scala/com/huawei/stream/NettySourceTest.scala

> add netty tcp/restful pushed source support
> ---
>
> Key: FLINK-4630
> URL: https://issues.apache.org/jira/browse/FLINK-4630
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: shijinkui
>
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526303#comment-15526303
 ] 

ASF GitHub Bot commented on FLINK-3734:
---

Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2557#discussion_r80704455
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -568,8 +568,8 @@ public void 
setInitialState(ChainedStateHandle chainedState,
private void restoreState() throws Exception {
final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
 
-   try {
-   if (lazyRestoreChainedOperatorState != null) {
+   if (lazyRestoreChainedOperatorState != null) {
--- End diff --

this `if (lazyRestoreChainedOperatorState != null)` check was moved out of 
the `try` block because the same check is required in the `finally` block


> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2557: [FLINK-3734] Close stream state handles after stat...

2016-09-27 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2557#discussion_r80704455
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -568,8 +568,8 @@ public void 
setInitialState(ChainedStateHandle chainedState,
private void restoreState() throws Exception {
final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
 
-   try {
-   if (lazyRestoreChainedOperatorState != null) {
+   if (lazyRestoreChainedOperatorState != null) {
--- End diff --

this `if (lazyRestoreChainedOperatorState != null)` check was moved out of 
the `try` block because the same check is required in the `finally` block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526278#comment-15526278
 ] 

ASF GitHub Bot commented on FLINK-3734:
---

GitHub user lw-lin opened a pull request:

https://github.com/apache/flink/pull/2557

[FLINK-3734] Close stream state handles after state restorations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/flink close-stream-state-handles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2557


commit eb2e2e45c7d76905b2b247954e4ef81f34ce471b
Author: Liwei Lin 
Date:   2016-09-27T14:12:23Z

[FLINK-3734] Close stream state handles after state restorations




> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2557: [FLINK-3734] Close stream state handles after stat...

2016-09-27 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/flink/pull/2557

[FLINK-3734] Close stream state handles after state restorations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/flink close-stream-state-handles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2557


commit eb2e2e45c7d76905b2b247954e4ef81f34ce471b
Author: Liwei Lin 
Date:   2016-09-27T14:12:23Z

[FLINK-3734] Close stream state handles after state restorations




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2556: [FLINK-4573] Fix potential resource leak due to un...

2016-09-27 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/flink/pull/2556

[FLINK-4573] Fix potential resource leak due to unclosed RandomAccess…

…File in TaskManagerLogHandler

--

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/flink raf-close

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2556


commit 8ec0dfae40c7b06e01e57260d16d6256f885f4e6
Author: Liwei Lin 
Date:   2016-09-27T12:49:52Z

[FLINK-4573] Fix potential resource leak due to unclosed RandomAccessFile 
in TaskManagerLogHandler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526276#comment-15526276
 ] 

ASF GitHub Bot commented on FLINK-4573:
---

GitHub user lw-lin opened a pull request:

https://github.com/apache/flink/pull/2556

[FLINK-4573] Fix potential resource leak due to unclosed RandomAccess…

…File in TaskManagerLogHandler

--

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/flink raf-close

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2556


commit 8ec0dfae40c7b06e01e57260d16d6256f885f4e6
Author: Liwei Lin 
Date:   2016-09-27T12:49:52Z

[FLINK-4573] Fix potential resource leak due to unclosed RandomAccessFile 
in TaskManagerLogHandler




> Potential resource leak due to unclosed RandomAccessFile in 
> TaskManagerLogHandler
> -
>
> Key: FLINK-4573
> URL: https://issues.apache.org/jira/browse/FLINK-4573
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> try {
> raf = new 
> RandomAccessFile(file, "r");
> } catch 
> (FileNotFoundException e) {
> display(ctx, request, 
> "Displaying TaskManager log failed.");
> LOG.error("Displaying 
> TaskManager log failed.", e);
> return;
> }
> long fileLength = 
> raf.length();
> final FileChannel fc = 
> raf.getChannel();
> {code}
> If length() throws IOException, raf would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2544: [FLINK-4218] [checkpoints] Do not rely on FileSystem to d...

2016-09-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2544
  
Manually merged in 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2544: [FLINK-4218] [checkpoints] Do not rely on FileSyst...

2016-09-27 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2544


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526247#comment-15526247
 ] 

ASF GitHub Bot commented on FLINK-4218:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2544
  
Manually merged in 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113


> Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." 
> causes task restarting
> --
>
> Key: FLINK-4218
> URL: https://issues.apache.org/jira/browse/FLINK-4218
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.1.0
>Reporter: Sergii Koshel
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Sporadically see exception as below. And restart of task because of it.
> {code:title=Exception|borderStyle=solid}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779)
>   ... 8 more
> {code}
> File actually exists on S3. 
> I suppose it is related to some race conditions with S3 but would be good to 
> retry a few times before stop task execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4554) Add support for array types

2016-09-27 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526259#comment-15526259
 ] 

Timo Walther commented on FLINK-4554:
-

[~kirill-morozov-epam] thanks for your interest. Unfortunately, I already 
started with this issue and will open a PR soon. Maybe you can pick something 
else. There are many other Table API/SQL related issues open.

> Add support for array types
> ---
>
> Key: FLINK-4554
> URL: https://issues.apache.org/jira/browse/FLINK-4554
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Support creating arrays:
> {code}ARRAY[1, 2, 3]{code}
> Access array values:
> {code}myArray[3]{code}
> And operations like:
> {{UNNEST, UNNEST WITH ORDINALITY, CARDINALITY}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526248#comment-15526248
 ] 

ASF GitHub Bot commented on FLINK-4218:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2544


> Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." 
> causes task restarting
> --
>
> Key: FLINK-4218
> URL: https://issues.apache.org/jira/browse/FLINK-4218
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.1.0
>Reporter: Sergii Koshel
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Sporadically see exception as below. And restart of task because of it.
> {code:title=Exception|borderStyle=solid}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
>   at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779)
>   ... 8 more
> {code}
> File actually exists on S3. 
> I suppose it is related to some race conditions with S3 but would be good to 
> retry a few times before stop task execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2555: [FLINK-4695] Introduce MetricRegistryConfiguration to enc...

2016-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2555
  
True, the benefit of this is only marginal in the current master. It is 
actually part of the `TaskManager` services start up logic in Flip-6. In this 
branch, the services will be generated by providing component specific 
configurations objects. These objects are generated from the global 
configuration and only contain component specific fields. That way, we decouple 
components from the global configuration object.

In order to avoid future merge conflicts between Flip-6 and the master 
branch, I wanted to merge this specific commit to the master. I should have 
stated this earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4695) Separate configuration parsing from MetricRegistry

2016-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526182#comment-15526182
 ] 

ASF GitHub Bot commented on FLINK-4695:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2555
  
True, the benefit of this is only marginal in the current master. It is 
actually part of the `TaskManager` services start up logic in Flip-6. In this 
branch, the services will be generated by providing component specific 
configurations objects. These objects are generated from the global 
configuration and only contain component specific fields. That way, we decouple 
components from the global configuration object.

In order to avoid future merge conflicts between Flip-6 and the master 
branch, I wanted to merge this specific commit to the master. I should have 
stated this earlier.


> Separate configuration parsing from MetricRegistry
> --
>
> Key: FLINK-4695
> URL: https://issues.apache.org/jira/browse/FLINK-4695
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to decouple the {{MetricRegistry}} object instantiation from the 
> global configuration, we could introduce a {{MetricRegistryConfiguration}} 
> object which encapsulates all necessary information for the 
> {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static 
> method to be generated from a {{Configuration}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >