[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2550 For "cluster-id" thing, actually there is some similar functionality in current Flink, configed by "high-availability.zookeeper.path.namespace". It will work with another config "high-availability.zookeeper.path.root" which is "/flink" by default. The real root in zk will be "/flink/some_namespace", we can change the namespace when we have a new cluster. Maybe we should do it explicitly in Cli or some shell script. Do you think that will work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528274#comment-15528274 ] ASF GitHub Bot commented on FLINK-4657: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2550 For "cluster-id" thing, actually there is some similar functionality in current Flink, configed by "high-availability.zookeeper.path.namespace". It will work with another config "high-availability.zookeeper.path.root" which is "/flink" by default. The real root in zk will be "/flink/some_namespace", we can change the namespace when we have a new cluster. Maybe we should do it explicitly in Cli or some shell script. Do you think that will work? > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528245#comment-15528245 ] ASF GitHub Bot commented on FLINK-4657: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80837572 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { --- End diff -- I'm not sure that throw a exception by RpcMethod is a good way to do error handling. From the caller's side, when the rpc method returns the Future from gateway, caller can do error handling with handleAsync or exceptionallyAsync now. But the exception from user logic with mess with all the exceptions from rpc framework, like RpcTimeout or other exception that tells you that maybe the rpc system does not work well. So typically you need to try to figure out what went wrong by distinguishing the Exception type, which is not very elegant i think. One way we can do is we never throw exception in RpcMethod but deal with error in the "ErrorCode" way by return the error explicitly with return value. All the exception thrown when doing rpc call should due to the rpc framework. In this situation, returning null is indicating that something wrong with requesting. (If we should know more detail about error, we can rich it by returning message, currently null will do the work) And in normal case like no further split, we still return a NextInputSplit with empty content in it. What do you think about all these, let me know. > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80837572 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { --- End diff -- I'm not sure that throw a exception by RpcMethod is a good way to do error handling. From the caller's side, when the rpc method returns the Future from gateway, caller can do error handling with handleAsync or exceptionallyAsync now. But the exception from user logic with mess with all the exceptions from rpc framework, like RpcTimeout or other exception that tells you that maybe the rpc system does not work well. So typically you need to try to figure out what went wrong by distinguishing the Exception type, which is not very elegant i think. One way we can do is we never throw exception in RpcMethod but deal with error in the "ErrorCode" way by return the error explicitly with return value. All the exception thrown when doing rpc call should due to the rpc framework. In this situation, returning null is indicating that something wrong with requesting. (If we should know more detail about error, we can rich it by returning message, currently null will do the work) And in normal case like no further split, we still return a NextInputSplit with empty content in it. What do you think about all these, let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4670) Add watch mechanism on current RPC framework
[ https://issues.apache.org/jira/browse/FLINK-4670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528226#comment-15528226 ] ASF GitHub Bot commented on FLINK-4670: --- Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2543 > Add watch mechanism on current RPC framework > > > Key: FLINK-4670 > URL: https://issues.apache.org/jira/browse/FLINK-4670 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > Fix For: 1.2.0 > > > Add watch mechanism on current RPC framework so that RPC gateway could be > watched to make sure the rpc server is running just like previous DeathWatch > in akka -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2543: [FLINK-4670] [cluster management] Add watch mechan...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2543 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528211#comment-15528211 ] ASF GitHub Bot commented on FLINK-4606: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2540#discussion_r80836325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -324,6 +337,158 @@ public void handleError(final Exception exception) { shutDown(); } + /** +* Registers an infoMessage listener +* +* @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager +*/ + @RpcMethod + public void registerInfoMessageListener(final String infoMessageListenerAddress) { + if(infoMessageListeners.containsKey(infoMessageListenerAddress)) { + log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress); + } else { + Future infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class); + + infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction() { + @Override + public void accept(InfoMessageListenerRpcGateway gateway) { + log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress); + infoMessageListeners.put(infoMessageListenerAddress, gateway); + } + }, getMainThreadExecutor()); + + infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable failure) { + log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress); + return null; + } + }, getMainThreadExecutor()); + } + } + + /** +* Unregisters an infoMessage listener +* +* @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager +* +*/ + @RpcMethod + public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) { + infoMessageListeners.remove(infoMessageListenerAddress); + } + + /** +* Shutdowns cluster +* +* @param finalStatus +* @param optionalDiagnostics +*/ + @RpcMethod + public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) { + log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics); + shutDownApplication(finalStatus, optionalDiagnostics); + } + + /** +* This method should be called by the framework once it detects that a currently registered task executor has failed. +* +* @param resourceID Id of the worker that has failed. +* @param message An informational message that explains why the worker failed. +*/ + public void notifyWorkerFailed(final ResourceID resourceID, String message) { + runAsync(new Runnable() { + @Override + public void run() { + WorkerType worker = taskExecutorGateways.remove(resourceID); + if (worker != null) { + // TODO :: suggest failed task executor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + } + } + }); + } + + /** +* Gets the number of currently started TaskManagers. +* +* @return The number of currently started TaskManagers. +*/ + public int getNumberOfStartedTaskManagers() { + return taskExecutorGateways.size(); + } + + /** +* Notifies the resource manager of a fatal error. +* +* IMPORTANT: This should not cleanly shut down this master, but exit it in +* such a way that a high-availability setting would restart this or fail over +* to another master. +*/ + public void onFatalError(final String message, final Throwable error) { + runAsync(new Runnable()
[jira] [Assigned] (FLINK-4679) Add TumbleRow row-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4679: -- Assignee: Jark Wu > Add TumbleRow row-windows for streaming tables > -- > > Key: FLINK-4679 > URL: https://issues.apache.org/jira/browse/FLINK-4679 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > Add TumbleRow row-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > This task requires to implement a custom stream operator and integrate it > with checkpointing and timestamp / watermark logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2540#discussion_r80836325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -324,6 +337,158 @@ public void handleError(final Exception exception) { shutDown(); } + /** +* Registers an infoMessage listener +* +* @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager +*/ + @RpcMethod + public void registerInfoMessageListener(final String infoMessageListenerAddress) { + if(infoMessageListeners.containsKey(infoMessageListenerAddress)) { + log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress); + } else { + Future infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class); + + infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction() { + @Override + public void accept(InfoMessageListenerRpcGateway gateway) { + log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress); + infoMessageListeners.put(infoMessageListenerAddress, gateway); + } + }, getMainThreadExecutor()); + + infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable failure) { + log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress); + return null; + } + }, getMainThreadExecutor()); + } + } + + /** +* Unregisters an infoMessage listener +* +* @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager +* +*/ + @RpcMethod + public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) { + infoMessageListeners.remove(infoMessageListenerAddress); + } + + /** +* Shutdowns cluster +* +* @param finalStatus +* @param optionalDiagnostics +*/ + @RpcMethod + public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) { + log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics); + shutDownApplication(finalStatus, optionalDiagnostics); + } + + /** +* This method should be called by the framework once it detects that a currently registered task executor has failed. +* +* @param resourceID Id of the worker that has failed. +* @param message An informational message that explains why the worker failed. +*/ + public void notifyWorkerFailed(final ResourceID resourceID, String message) { + runAsync(new Runnable() { + @Override + public void run() { + WorkerType worker = taskExecutorGateways.remove(resourceID); + if (worker != null) { + // TODO :: suggest failed task executor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + } + } + }); + } + + /** +* Gets the number of currently started TaskManagers. +* +* @return The number of currently started TaskManagers. +*/ + public int getNumberOfStartedTaskManagers() { + return taskExecutorGateways.size(); + } + + /** +* Notifies the resource manager of a fatal error. +* +* IMPORTANT: This should not cleanly shut down this master, but exit it in +* such a way that a high-availability setting would restart this or fail over +* to another master. +*/ + public void onFatalError(final String message, final Throwable error) { + runAsync(new Runnable() { + @Override + public void run() { + fatalError(message, error); + } + }); + } + + //
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528198#comment-15528198 ] ASF GitHub Bot commented on FLINK-4657: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80835773 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); --- End diff -- ok, will change to "debug" and leave some comments about this. > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80835773 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); --- End diff -- ok, will change to "debug" and leave some comments about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528193#comment-15528193 ] ASF GitHub Bot commented on FLINK-4657: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80835559 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { --- End diff -- yes, you're right, will change this. > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528189#comment-15528189 ] ASF GitHub Bot commented on FLINK-4657: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80835462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self !
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80835559 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { --- End diff -- yes, you're right, will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80835462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +*/ + + if (newJobStatus == JobStatus.FINISHED) { + try { +
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80834516 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +*/ + + if (newJobStatus == JobStatus.FINISHED) { + try { +
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528167#comment-15528167 ] ASF GitHub Bot commented on FLINK-4657: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80834516 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self !
[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528157#comment-15528157 ] ASF GitHub Bot commented on FLINK-4068: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2560 [FLINK-4068] [table] Move constant computations out of code-generated Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed The origin PR is #2102. This PR makes `ReduceExpressionsRules` take effect. This rule can reduce constant expressions and replacing them with the corresponding constant. And add some tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink reduce-expression-FLINK-4068 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2560.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2560 commit 22a0f2e949e524e20151c11cc83bd6e1e4f2a1b2 Author: Jark WuDate: 2016-09-28T02:31:59Z [FLINK-4068] [table] Move constant computations out of code-generated > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2560: [FLINK-4068] [table] Move constant computations ou...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2560 [FLINK-4068] [table] Move constant computations out of code-generated Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed The origin PR is #2102. This PR makes `ReduceExpressionsRules` take effect. This rule can reduce constant expressions and replacing them with the corresponding constant. And add some tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink reduce-expression-FLINK-4068 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2560.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2560 commit 22a0f2e949e524e20151c11cc83bd6e1e4f2a1b2 Author: Jark WuDate: 2016-09-28T02:31:59Z [FLINK-4068] [table] Move constant computations out of code-generated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4450) update storm version to 1.0.0
[ https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528031#comment-15528031 ] ASF GitHub Bot commented on FLINK-4450: --- Github user liuyuzhong commented on the issue: https://github.com/apache/flink/pull/2439 Jenkins run successful, but CI fail. ![image](https://cloud.githubusercontent.com/assets/12843176/18897594/2c9e40e2-855e-11e6-8d3c-0397b33d5d8f.png) > update storm version to 1.0.0 > - > > Key: FLINK-4450 > URL: https://issues.apache.org/jira/browse/FLINK-4450 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: yuzhongliu > Fix For: 2.0.0 > > > The storm package path was changed in new version > storm old version package: > backtype.storm.* > storm new version pachage: > org.apache.storm.* > shall we update flink/flink-storm code to new storm version? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...
Github user liuyuzhong commented on the issue: https://github.com/apache/flink/pull/2439 Jenkins run successful, but CI fail. ![image](https://cloud.githubusercontent.com/assets/12843176/18897594/2c9e40e2-855e-11e6-8d3c-0397b33d5d8f.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4646) Add BipartiteGraph class
[ https://issues.apache.org/jira/browse/FLINK-4646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-4646: - Assignee: Ivan Mushketyk > Add BipartiteGraph class > > > Key: FLINK-4646 > URL: https://issues.apache.org/jira/browse/FLINK-4646 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement a class to represent a bipartite graph in Flink Gelly. Design > discussions can be found in the parent task. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80784331 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }) - .setParallelism(1); - - KafkaTableSink kafkaTableSinkBase = createTableSink(); - -
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527303#comment-15527303 ] ASF GitHub Bot commented on FLINK-3874: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80783113 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { -
[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80783113 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }) - .setParallelism(1); - - KafkaTableSink kafkaTableSinkBase = createTableSink(); - -
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527304#comment-15527304 ] ASF GitHub Bot commented on FLINK-3874: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80783750 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { -
[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80783750 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }) - .setParallelism(1); - - KafkaTableSink kafkaTableSinkBase = createTableSink(); - -
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527305#comment-15527305 ] ASF GitHub Bot commented on FLINK-3874: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80784331 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { -
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2559 [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints. This pull request changes the offset committing to use Kafka's `commitAsync()` method. It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2559.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2559 commit eafba8600863c18e09397366485bcfc6ff44960f Author: Stephan EwenDate: 2016-09-27T18:59:35Z [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4702) Kafka consumer must commit offsets asynchronously
[ https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527146#comment-15527146 ] ASF GitHub Bot commented on FLINK-4702: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2559 [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints. This pull request changes the offset committing to use Kafka's `commitAsync()` method. It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2559.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2559 commit eafba8600863c18e09397366485bcfc6ff44960f Author: Stephan EwenDate: 2016-09-27T18:59:35Z [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously > Kafka consumer must commit offsets asynchronously > - > > Key: FLINK-4702 > URL: https://issues.apache.org/jira/browse/FLINK-4702 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > The offset commit calls to Kafka may occasionally take very long. > In that case, the {{notifyCheckpointComplete()}} method blocks for long and > the KafkaConsumer cannot make progress and cannot perform checkpoints. > Kafka 0.9+ have methods to commit asynchronously. > We should use those and make sure no more than one commit is concurrently in > progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4702) Kafka consumer must commit offsets asynchronously
Stephan Ewen created FLINK-4702: --- Summary: Kafka consumer must commit offsets asynchronously Key: FLINK-4702 URL: https://issues.apache.org/jira/browse/FLINK-4702 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.1.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Blocker Fix For: 1.2.0, 1.1.3 The offset commit calls to Kafka may occasionally take very long. In that case, the {{notifyCheckpointComplete()}} method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints. Kafka 0.9+ have methods to commit asynchronously. We should use those and make sure no more than one commit is concurrently in progress, to that commit requests do not pile up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4675) Remove Parameter from WindowAssigner.getDefaultTrigger()
[ https://issues.apache.org/jira/browse/FLINK-4675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527122#comment-15527122 ] Stephan Ewen commented on FLINK-4675: - It would be okay to break it now (from what we defined as stability guarantees) - but tough call still. Do we have a hunch how many people write their own window assigners? If only few people write custom window assigners, it is probably fine. > Remove Parameter from WindowAssigner.getDefaultTrigger() > > > Key: FLINK-4675 > URL: https://issues.apache.org/jira/browse/FLINK-4675 > Project: Flink > Issue Type: Sub-task > Components: Windowing Operators >Reporter: Aljoscha Krettek > Fix For: 2.0.0 > > > For legacy reasons the method has {{StreamExecutionEnvironment}} as a > parameter. This is not needed anymore. > [~StephanEwen] do you think we should break this now? {{WindowAssigner}} is > {{PublicEvolving}} but I wanted to play it conservative for now. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests
[ https://issues.apache.org/jira/browse/FLINK-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527013#comment-15527013 ] Fabian Hueske commented on FLINK-4699: -- There is a pull request to convert the KafkaJSONSinkITCases into unit tests: https://github.com/apache/flink/pull/2430 > Convert Kafka TableSource/TableSink tests to unit tests > --- > > Key: FLINK-4699 > URL: https://issues.apache.org/jira/browse/FLINK-4699 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Reporter: Timo Walther > > The Kafka tests are extremely heavy and that the Table Sources and Sinks are > only thin wrappers on top of the Kafka Sources / Sinks. That should not need > to bring up Kafka clusters. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526962#comment-15526962 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2550 Can we try and unify the structure / paths under which all this information is stored in ZooKeeper? ``` /flink +/cluster_id_1/resource_manager_lock || |+/job-id-1/job_manager_lock || /checkpoints/latest || /latest-1 || /latest-2 || |+/job-id-2/job_manager_lock | +/cluster_id_2/resource_manager_lock | +/job-id-1/job_manager_lock |/checkpoints/latest |/latest-1 |/persisted_job_graph ``` The "cluster-id" should be a generated UUID in the case of YARN/Mesos, and should be a config value in the standalone case. In Yarn / Mesos, the UUID should be passed via an environment variable to the Java processes with the entry points for TaskManager / JobManager / ResourceManager. In the Constructor, the ZooKeeper HA Services should get the "cluster-id". > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2550 Can we try and unify the structure / paths under which all this information is stored in ZooKeeper? ``` /flink +/cluster_id_1/resource_manager_lock || |+/job-id-1/job_manager_lock || /checkpoints/latest || /latest-1 || /latest-2 || |+/job-id-2/job_manager_lock | +/cluster_id_2/resource_manager_lock | +/job-id-1/job_manager_lock |/checkpoints/latest |/latest-1 |/persisted_job_graph ``` The "cluster-id" should be a generated UUID in the case of YARN/Mesos, and should be a config value in the standalone case. In Yarn / Mesos, the UUID should be passed via an environment variable to the Java processes with the entry points for TaskManager / JobManager / ResourceManager. In the Constructor, the ZooKeeper HA Services should get the "cluster-id". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4701) Unprotected access to cancelables in StreamTask
Ted Yu created FLINK-4701: - Summary: Unprotected access to cancelables in StreamTask Key: FLINK-4701 URL: https://issues.apache.org/jira/browse/FLINK-4701 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor In performCheckpoint(): {code} AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( "checkpoint-" + checkpointId + "-" + timestamp, this, cancelables, chainedStateHandles, keyGroupsStateHandleFuture, checkpointId, bytesBufferedAlignment, alignmentDurationNanos, syncDurationMillis, endOfSyncPart); synchronized (cancelables) { cancelables.add(asyncCheckpointRunnable); } {code} Construction of AsyncCheckpointRunnable should be put under the synchronized block of cancelables. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4543) Race Deadlock in SpilledSubpartitionViewTest
[ https://issues.apache.org/jira/browse/FLINK-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4543. --- > Race Deadlock in SpilledSubpartitionViewTest > > > Key: FLINK-4543 > URL: https://issues.apache.org/jira/browse/FLINK-4543 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The test deadlocked (Java level deadlock) with the following stack traces: > {code} > Found one Java-level deadlock: > = > "pool-1-thread-2": > waiting to lock monitor 0x7fec2c006168 (object 0xef661c20, a > java.lang.Object), > which is held by "IOManager reader thread #1" > "IOManager reader thread #1": > waiting to lock monitor 0x7fec2c005ea8 (object 0xef62c8a8, a > java.lang.Object), > which is held by "pool-1-thread-2" > Java stack information for the threads listed above: > === > "pool-1-thread-2": > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:309) > - waiting to lock <0xef661c20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:261) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:380) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:366) > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135) > - locked <0xef62c8a8> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118) > - locked <0xef9597c0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.util.TestConsumerCallback$RecyclingCallback.onBuffer(TestConsumerCallback.java:72) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:87) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "IOManager reader thread #1": > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:126) > - waiting to lock <0xef62c8a8> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118) > - locked <0xefa016f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:275) > - locked <0xef661c20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:343) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:333) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199) > at > org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:435) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:408) > Found 1 deadlock. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4543) Race Deadlock in SpilledSubpartitionViewTest
[ https://issues.apache.org/jira/browse/FLINK-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4543. - Resolution: Fixed Fixed via 90902914ac4b11f9554b67ad49e0d697a0d02f93 > Race Deadlock in SpilledSubpartitionViewTest > > > Key: FLINK-4543 > URL: https://issues.apache.org/jira/browse/FLINK-4543 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The test deadlocked (Java level deadlock) with the following stack traces: > {code} > Found one Java-level deadlock: > = > "pool-1-thread-2": > waiting to lock monitor 0x7fec2c006168 (object 0xef661c20, a > java.lang.Object), > which is held by "IOManager reader thread #1" > "IOManager reader thread #1": > waiting to lock monitor 0x7fec2c005ea8 (object 0xef62c8a8, a > java.lang.Object), > which is held by "pool-1-thread-2" > Java stack information for the threads listed above: > === > "pool-1-thread-2": > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:309) > - waiting to lock <0xef661c20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:261) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:380) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:366) > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135) > - locked <0xef62c8a8> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118) > - locked <0xef9597c0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.util.TestConsumerCallback$RecyclingCallback.onBuffer(TestConsumerCallback.java:72) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:87) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "IOManager reader thread #1": > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:126) > - waiting to lock <0xef62c8a8> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118) > - locked <0xefa016f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:275) > - locked <0xef661c20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:343) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:333) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199) > at > org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:435) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:408) > Found 1 deadlock. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4543) Race Deadlock in SpilledSubpartitionViewTest
[ https://issues.apache.org/jira/browse/FLINK-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526905#comment-15526905 ] ASF GitHub Bot commented on FLINK-4543: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2444 > Race Deadlock in SpilledSubpartitionViewTest > > > Key: FLINK-4543 > URL: https://issues.apache.org/jira/browse/FLINK-4543 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The test deadlocked (Java level deadlock) with the following stack traces: > {code} > Found one Java-level deadlock: > = > "pool-1-thread-2": > waiting to lock monitor 0x7fec2c006168 (object 0xef661c20, a > java.lang.Object), > which is held by "IOManager reader thread #1" > "IOManager reader thread #1": > waiting to lock monitor 0x7fec2c005ea8 (object 0xef62c8a8, a > java.lang.Object), > which is held by "pool-1-thread-2" > Java stack information for the threads listed above: > === > "pool-1-thread-2": > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:309) > - waiting to lock <0xef661c20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:261) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:380) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:366) > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135) > - locked <0xef62c8a8> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118) > - locked <0xef9597c0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.util.TestConsumerCallback$RecyclingCallback.onBuffer(TestConsumerCallback.java:72) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:87) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "IOManager reader thread #1": > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:126) > - waiting to lock <0xef62c8a8> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:118) > - locked <0xefa016f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:275) > - locked <0xef661c20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:343) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:333) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199) > at > org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:435) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:408) > Found 1 deadlock. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2444: [FLINK-4543] [network] Fix potential deadlock in S...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2444 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4560) enforcer java version as 1.7
[ https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4560. --- > enforcer java version as 1.7 > > > Key: FLINK-4560 > URL: https://issues.apache.org/jira/browse/FLINK-4560 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > Fix For: 1.2.0 > > > 1. maven-enforcer-plugin add java version enforce > 2. maven-enforcer-plugin version upgrade to 1.4.1 > explicit require java version -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4560) enforcer java version as 1.7
[ https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4560. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed via b928935b8c5be02b23dd2cb87144ae1ea001278c Thank you for the contribution! > enforcer java version as 1.7 > > > Key: FLINK-4560 > URL: https://issues.apache.org/jira/browse/FLINK-4560 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > Fix For: 1.2.0 > > > 1. maven-enforcer-plugin add java version enforce > 2. maven-enforcer-plugin version upgrade to 1.4.1 > explicit require java version -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4560) enforcer java version as 1.7
[ https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526904#comment-15526904 ] ASF GitHub Bot commented on FLINK-4560: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2458 > enforcer java version as 1.7 > > > Key: FLINK-4560 > URL: https://issues.apache.org/jira/browse/FLINK-4560 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > Fix For: 1.2.0 > > > 1. maven-enforcer-plugin add java version enforce > 2. maven-enforcer-plugin version upgrade to 1.4.1 > explicit require java version -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2458: [FLINK-4560] enforcer java version as 1.7
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2458 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4695) Separate configuration parsing from MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526820#comment-15526820 ] ASF GitHub Bot commented on FLINK-4695: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2555 > Separate configuration parsing from MetricRegistry > -- > > Key: FLINK-4695 > URL: https://issues.apache.org/jira/browse/FLINK-4695 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to decouple the {{MetricRegistry}} object instantiation from the > global configuration, we could introduce a {{MetricRegistryConfiguration}} > object which encapsulates all necessary information for the > {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static > method to be generated from a {{Configuration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2510 Sounds good @StephanEwen . Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2555: [FLINK-4695] Introduce MetricRegistryConfiguration...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2555 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4695) Separate configuration parsing from MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4695. Resolution: Fixed Added via f1b5b35f595e7ae53001a4c46edbf0c9b78ee376 > Separate configuration parsing from MetricRegistry > -- > > Key: FLINK-4695 > URL: https://issues.apache.org/jira/browse/FLINK-4695 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to decouple the {{MetricRegistry}} object instantiation from the > global configuration, we could introduce a {{MetricRegistryConfiguration}} > object which encapsulates all necessary information for the > {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static > method to be generated from a {{Configuration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4695) Separate configuration parsing from MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526805#comment-15526805 ] ASF GitHub Bot commented on FLINK-4695: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2555 Thanks for the review @StephanEwen. Build passed locally. Will merge the PR. > Separate configuration parsing from MetricRegistry > -- > > Key: FLINK-4695 > URL: https://issues.apache.org/jira/browse/FLINK-4695 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to decouple the {{MetricRegistry}} object instantiation from the > global configuration, we could introduce a {{MetricRegistryConfiguration}} > object which encapsulates all necessary information for the > {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static > method to be generated from a {{Configuration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2555: [FLINK-4695] Introduce MetricRegistryConfiguration to enc...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2555 Thanks for the review @StephanEwen. Build passed locally. Will merge the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526787#comment-15526787 ] ASF GitHub Bot commented on FLINK-3322: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2510 Sounds good @StephanEwen . Thank you. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526751#comment-15526751 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80740850 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self !
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526750#comment-15526750 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80734121 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; --- End diff -- Let's let this throw an exception - after all, the caller receiver should react to the fact that the JobManager is not aware of its execution any more. > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526753#comment-15526753 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80733924 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); --- End diff -- The log could be on "debug" level, as this situation can occur when the JobManager failed tasks and unregistered them, but the cancel() messages have not reached the TaskManager. Since this is a valid situation, it should not cause a "WARN" message. In general, all situations that are valid race conditions and that need not indicate a corrupted state or lead to a failure / recovery should probably not have a "WARN" level. > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526754#comment-15526754 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80733402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { --- End diff -- Let's let this method throw an exception. Then the calling TaskManager can see the difference between `null` (= no further split) and `Exception` (= something went wrong / is inconsistent). > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526752#comment-15526752 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80742192 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { --- End diff -- Is there a way we can get this method out of the public interface? It could do state changes in `callAsync()`. That way, it would not be callable via RPC, because it seems like this is no method anyone should invoke remotely. > Implement HighAvailabilityServices based on zookeeper > - > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526749#comment-15526749 ] ASF GitHub Bot commented on FLINK-4657: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80740727 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self !
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80734121 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; --- End diff -- Let's let this throw an exception - after all, the caller receiver should react to the fact that the JobManager is not aware of its execution any more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80733402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { --- End diff -- Let's let this method throw an exception. Then the calling TaskManager can see the difference between `null` (= no further split) and `Exception` (= something went wrong / is inconsistent). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80740850 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +*/ + + if (newJobStatus == JobStatus.FINISHED) { + try { +
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80740727 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { +jobInfo.setLastActive() +val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +}(context.dispatcher) + } else { +self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } +*/ + + if (newJobStatus == JobStatus.FINISHED) { + try { +
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80733924 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); --- End diff -- The log could be on "debug" level, as this situation can occur when the JobManager failed tasks and unregistered them, but the cancel() messages have not reached the TaskManager. Since this is a valid situation, it should not cause a "WARN" message. In general, all situations that are valid race conditions and that need not indicate a corrupted state or lead to a failure / recovery should probably not have a "WARN" level. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80742192 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { --- End diff -- Is there a way we can get this method out of the public interface? It could do state changes in `callAsync()`. That way, it would not be callable via RPC, because it seems like this is no method anyone should invoke remotely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2552: [FLINK-4690] Replace SlotAllocationFuture with fli...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2552 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4690) Replace SlotAllocationFuture with flink's own future
[ https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4690. -- Resolution: Fixed Added via 84672c22f8088a70caf35b54d74eee458bf600dd > Replace SlotAllocationFuture with flink's own future > > > Key: FLINK-4690 > URL: https://issues.apache.org/jira/browse/FLINK-4690 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4361) Introduce Flink's own future abstraction
[ https://issues.apache.org/jira/browse/FLINK-4361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526671#comment-15526671 ] Till Rohrmann commented on FLINK-4361: -- Added to the master via f8138f4b74332ecb4ef0d28a09e8549708118ca6 > Introduce Flink's own future abstraction > > > Key: FLINK-4361 > URL: https://issues.apache.org/jira/browse/FLINK-4361 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Till Rohrmann > > In order to keep the abstraction Scala Independent, we should not rely on > Scala Futures -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4361) Introduce Flink's own future abstraction
[ https://issues.apache.org/jira/browse/FLINK-4361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526669#comment-15526669 ] ASF GitHub Bot commented on FLINK-4361: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2554 > Introduce Flink's own future abstraction > > > Key: FLINK-4361 > URL: https://issues.apache.org/jira/browse/FLINK-4361 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Till Rohrmann > > In order to keep the abstraction Scala Independent, we should not rely on > Scala Futures -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4690) Replace SlotAllocationFuture with flink's own future
[ https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526668#comment-15526668 ] ASF GitHub Bot commented on FLINK-4690: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2552 > Replace SlotAllocationFuture with flink's own future > > > Key: FLINK-4690 > URL: https://issues.apache.org/jira/browse/FLINK-4690 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2554: [FLINK-4361] Introduce Flink's own future abstract...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2554 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4690) Replace SlotAllocationFuture with flink's own future
[ https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1552#comment-1552 ] ASF GitHub Bot commented on FLINK-4690: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2552 I've merged it into the master branch. You can close the PR. > Replace SlotAllocationFuture with flink's own future > > > Key: FLINK-4690 > URL: https://issues.apache.org/jira/browse/FLINK-4690 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2552: [FLINK-4690] Replace SlotAllocationFuture with flink's ow...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2552 I've merged it into the master branch. You can close the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter
[ https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526621#comment-15526621 ] ASF GitHub Bot commented on FLINK-4669: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2541#discussion_r80730795 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -124,7 +125,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Gets the checkpoint config, which defines values like checkpoint interval, delay between * checkpoints, etc. */ - def getCheckpointConfig = javaEnv.getCheckpointConfig() --- End diff -- Calling Java methods with parenthesis or without actually somewhat subject to debate. For Scala methods, the writer decides whether it is a method with or without parenthesis. For Java, the caller can only "guess" what the method does (is returning a mutable field considered a side effect?), so in most places we call Java methods as Java methods. > scala api createLocalEnvironment() function add default Configuration > parameter > --- > > Key: FLINK-4669 > URL: https://issues.apache.org/jira/browse/FLINK-4669 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: shijinkui > > scala program can't direct use createLocalEnvironment with custom Configure > object. > such as I want to start web server in local mode, I will do such as: > ``` > // set up execution environment > val conf = new Configuration > conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) > conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, > ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT) > val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment( > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2, > conf) > ) > ``` > so we need createLocalEnvironment function have a config parameter -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2541: [FLINK-4669] scala api createLocalEnvironment() fu...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2541#discussion_r80730795 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -124,7 +125,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Gets the checkpoint config, which defines values like checkpoint interval, delay between * checkpoints, etc. */ - def getCheckpointConfig = javaEnv.getCheckpointConfig() --- End diff -- Calling Java methods with parenthesis or without actually somewhat subject to debate. For Scala methods, the writer decides whether it is a method with or without parenthesis. For Java, the caller can only "guess" what the method does (is returning a mutable field considered a side effect?), so in most places we call Java methods as Java methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4583) NullPointerException in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-4583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-4583. - Resolution: Duplicate > NullPointerException in CliFrontend > --- > > Key: FLINK-4583 > URL: https://issues.apache.org/jira/browse/FLINK-4583 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > If no Flink program is executed the following exception message is printed. > This can happen when a driver prints usage due to insufficient or improper > configuration. > {noformat} > The program finished with the following exception: > java.lang.NullPointerException > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:781) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1002) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1045) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4700) Harden the TimeProvider test
Kostas Kloudas created FLINK-4700: - Summary: Harden the TimeProvider test Key: FLINK-4700 URL: https://issues.apache.org/jira/browse/FLINK-4700 Project: Flink Issue Type: Bug Components: Tests Reporter: Kostas Kloudas Assignee: Kostas Kloudas Currently the TimeProvider test fails due to a race condition. This task aims at fixing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4677) Jars with no job executions produces NullPointerException in ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-4677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526575#comment-15526575 ] ASF GitHub Bot commented on FLINK-4677: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2548 Do we have a standard way for user programs to print usage and exit without triggering this message? This is certainly preferable to the `NullPointerException`. > Jars with no job executions produces NullPointerException in ClusterClient > -- > > Key: FLINK-4677 > URL: https://issues.apache.org/jira/browse/FLINK-4677 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0, 1.1.2 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.2.0, 1.1.3 > > > When the user jar contains no job executions, the command-line client > displays a NullPointerException. This is not a big issue but should be > changed to something more descriptive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2548: [FLINK-4677] fail if user jar contains no executions
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2548 Do we have a standard way for user programs to print usage and exit without triggering this message? This is certainly preferable to the `NullPointerException`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4699) Convert Kafka TableSource/TableSink tests to unit tests
Timo Walther created FLINK-4699: --- Summary: Convert Kafka TableSource/TableSink tests to unit tests Key: FLINK-4699 URL: https://issues.apache.org/jira/browse/FLINK-4699 Project: Flink Issue Type: Test Components: Table API & SQL Reporter: Timo Walther The Kafka tests are extremely heavy and that the Table Sources and Sinks are only thin wrappers on top of the Kafka Sources / Sinks. That should not need to bring up Kafka clusters. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2548: [FLINK-4677] fail if user jar contains no executions
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2548 Looks good to me, +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4677) Jars with no job executions produces NullPointerException in ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-4677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526523#comment-15526523 ] ASF GitHub Bot commented on FLINK-4677: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2548 Looks good to me, +1 to merge > Jars with no job executions produces NullPointerException in ClusterClient > -- > > Key: FLINK-4677 > URL: https://issues.apache.org/jira/browse/FLINK-4677 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0, 1.1.2 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.2.0, 1.1.3 > > > When the user jar contains no job executions, the command-line client > displays a NullPointerException. This is not a big issue but should be > changed to something more descriptive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4698) Visualize additional checkpoint information
Stephan Ewen created FLINK-4698: --- Summary: Visualize additional checkpoint information Key: FLINK-4698 URL: https://issues.apache.org/jira/browse/FLINK-4698 Project: Flink Issue Type: Sub-task Components: Webfrontend Reporter: Stephan Ewen Fix For: 1.2.0 Display the additional information gathered in the {{CheckpointStatsTracker}} in the "Checkpoint" tab. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4685) Gather operator checkpoint durations data sizes from the runtime
[ https://issues.apache.org/jira/browse/FLINK-4685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4685. --- > Gather operator checkpoint durations data sizes from the runtime > > > Key: FLINK-4685 > URL: https://issues.apache.org/jira/browse/FLINK-4685 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4696) Limit the number of Akka Dispatcher Threads in LocalMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4696. --- > Limit the number of Akka Dispatcher Threads in LocalMiniCluster > --- > > Key: FLINK-4696 > URL: https://issues.apache.org/jira/browse/FLINK-4696 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > By default, akka spawns 2x or 3x the number of cores in threads. > For the LocalFlinkMiniCluster, running on Travis (often 64 cores), with > separate actor systems for jobmanager and multiple taskmanagers, this > frequetly means >600 akka threads. Flink uses about 4 actors. > This simply eats unnecessary resources. I suggest to have at most 4 threads > per actor system in test setups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4685) Gather operator checkpoint durations data sizes from the runtime
[ https://issues.apache.org/jira/browse/FLINK-4685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4685. - Resolution: Implemented Implemented in b1642e32c2f69c60c2b212260c3479feb66a9165 > Gather operator checkpoint durations data sizes from the runtime > > > Key: FLINK-4685 > URL: https://issues.apache.org/jira/browse/FLINK-4685 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4697) Gather more detailed checkpoint stats in CheckpointStatsTracker
Stephan Ewen created FLINK-4697: --- Summary: Gather more detailed checkpoint stats in CheckpointStatsTracker Key: FLINK-4697 URL: https://issues.apache.org/jira/browse/FLINK-4697 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Stephan Ewen Fix For: 1.2.0 The additional information attached to the {{AcknowledgeCheckpoint}} method must be gathered in the {{CheckpointStatsTracker}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4696) Limit the number of Akka Dispatcher Threads in LocalMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4696. - Resolution: Fixed Fixed via 6ea9284d29ec79576f073441a5de681019720ab0 > Limit the number of Akka Dispatcher Threads in LocalMiniCluster > --- > > Key: FLINK-4696 > URL: https://issues.apache.org/jira/browse/FLINK-4696 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > By default, akka spawns 2x or 3x the number of cores in threads. > For the LocalFlinkMiniCluster, running on Travis (often 64 cores), with > separate actor systems for jobmanager and multiple taskmanagers, this > frequetly means >600 akka threads. Flink uses about 4 actors. > This simply eats unnecessary resources. I suggest to have at most 4 threads > per actor system in test setups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4630) add netty tcp/restful pushed source support
[ https://issues.apache.org/jira/browse/FLINK-4630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526424#comment-15526424 ] shijinkui commented on FLINK-4630: -- Some business system such as risk online management. They need the response cost to be minimum, at least below 50ms. If we business system send message to kafka or other message queue, the delay will growing up. Message send directly from distributed business system to Flink Source, that is end-to-end. I only finished netty tcp source, example: https://github.com/HuaweiBigData/flink-netty-source/blob/master/src/test/scala/com/huawei/stream/NettySourceTest.scala > add netty tcp/restful pushed source support > --- > > Key: FLINK-4630 > URL: https://issues.apache.org/jira/browse/FLINK-4630 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: shijinkui > > When source stream get start, listen a provided tcp port, receive stream data > from user data source. > This netty tcp source is keepping alive and end-to-end, that is from business > system to flink worker directly. > Such source service is needed in produce indeed. > describe the source in detail below: > 1.source run as a netty tcp server > 2.user provide a tcp port, if the port is in used, increace the port > number between 1024 to 65535. Source can parallel. > 3.callback the provided url to report the real port to listen > 4.user push streaming data to netty server, then collect the data to flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526303#comment-15526303 ] ASF GitHub Bot commented on FLINK-3734: --- Github user lw-lin commented on a diff in the pull request: https://github.com/apache/flink/pull/2557#discussion_r80704455 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -568,8 +568,8 @@ public void setInitialState(ChainedStateHandle chainedState, private void restoreState() throws Exception { final StreamOperator[] allOperators = operatorChain.getAllOperators(); - try { - if (lazyRestoreChainedOperatorState != null) { + if (lazyRestoreChainedOperatorState != null) { --- End diff -- this `if (lazyRestoreChainedOperatorState != null)` check was moved out of the `try` block because the same check is required in the `finally` block > Unclosed DataInputView in > AbstractAlignedProcessingTimeWindowOperator#restoreState() > > > Key: FLINK-3734 > URL: https://issues.apache.org/jira/browse/FLINK-3734 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > DataInputView in = inputState.getState(getUserCodeClassloader()); > final long nextEvaluationTime = in.readLong(); > final long nextSlideTime = in.readLong(); > AbstractKeyedTimePanespanes = > createPanes(keySelector, function); > panes.readFromInput(in, keySerializer, stateTypeSerializer); > restoredState = new RestoredState<>(panes, nextEvaluationTime, > nextSlideTime); > } > {code} > DataInputView in is not closed upon return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2557: [FLINK-3734] Close stream state handles after stat...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/flink/pull/2557#discussion_r80704455 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -568,8 +568,8 @@ public void setInitialState(ChainedStateHandle chainedState, private void restoreState() throws Exception { final StreamOperator[] allOperators = operatorChain.getAllOperators(); - try { - if (lazyRestoreChainedOperatorState != null) { + if (lazyRestoreChainedOperatorState != null) { --- End diff -- this `if (lazyRestoreChainedOperatorState != null)` check was moved out of the `try` block because the same check is required in the `finally` block --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526278#comment-15526278 ] ASF GitHub Bot commented on FLINK-3734: --- GitHub user lw-lin opened a pull request: https://github.com/apache/flink/pull/2557 [FLINK-3734] Close stream state handles after state restorations Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/flink close-stream-state-handles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2557.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2557 commit eb2e2e45c7d76905b2b247954e4ef81f34ce471b Author: Liwei LinDate: 2016-09-27T14:12:23Z [FLINK-3734] Close stream state handles after state restorations > Unclosed DataInputView in > AbstractAlignedProcessingTimeWindowOperator#restoreState() > > > Key: FLINK-3734 > URL: https://issues.apache.org/jira/browse/FLINK-3734 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > DataInputView in = inputState.getState(getUserCodeClassloader()); > final long nextEvaluationTime = in.readLong(); > final long nextSlideTime = in.readLong(); > AbstractKeyedTimePanes panes = > createPanes(keySelector, function); > panes.readFromInput(in, keySerializer, stateTypeSerializer); > restoredState = new RestoredState<>(panes, nextEvaluationTime, > nextSlideTime); > } > {code} > DataInputView in is not closed upon return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2557: [FLINK-3734] Close stream state handles after stat...
GitHub user lw-lin opened a pull request: https://github.com/apache/flink/pull/2557 [FLINK-3734] Close stream state handles after state restorations Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/flink close-stream-state-handles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2557.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2557 commit eb2e2e45c7d76905b2b247954e4ef81f34ce471b Author: Liwei LinDate: 2016-09-27T14:12:23Z [FLINK-3734] Close stream state handles after state restorations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2556: [FLINK-4573] Fix potential resource leak due to un...
GitHub user lw-lin opened a pull request: https://github.com/apache/flink/pull/2556 [FLINK-4573] Fix potential resource leak due to unclosed RandomAccess⦠â¦File in TaskManagerLogHandler -- Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/flink raf-close Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2556.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2556 commit 8ec0dfae40c7b06e01e57260d16d6256f885f4e6 Author: Liwei LinDate: 2016-09-27T12:49:52Z [FLINK-4573] Fix potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler
[ https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526276#comment-15526276 ] ASF GitHub Bot commented on FLINK-4573: --- GitHub user lw-lin opened a pull request: https://github.com/apache/flink/pull/2556 [FLINK-4573] Fix potential resource leak due to unclosed RandomAccess… …File in TaskManagerLogHandler -- Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/flink raf-close Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2556.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2556 commit 8ec0dfae40c7b06e01e57260d16d6256f885f4e6 Author: Liwei LinDate: 2016-09-27T12:49:52Z [FLINK-4573] Fix potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler > Potential resource leak due to unclosed RandomAccessFile in > TaskManagerLogHandler > - > > Key: FLINK-4573 > URL: https://issues.apache.org/jira/browse/FLINK-4573 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > try { > raf = new > RandomAccessFile(file, "r"); > } catch > (FileNotFoundException e) { > display(ctx, request, > "Displaying TaskManager log failed."); > LOG.error("Displaying > TaskManager log failed.", e); > return; > } > long fileLength = > raf.length(); > final FileChannel fc = > raf.getChannel(); > {code} > If length() throws IOException, raf would be left unclosed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2544: [FLINK-4218] [checkpoints] Do not rely on FileSystem to d...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2544 Manually merged in 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2544: [FLINK-4218] [checkpoints] Do not rely on FileSyst...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2544 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting
[ https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526247#comment-15526247 ] ASF GitHub Bot commented on FLINK-4218: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2544 Manually merged in 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113 > Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." > causes task restarting > -- > > Key: FLINK-4218 > URL: https://issues.apache.org/jira/browse/FLINK-4218 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.1.0 >Reporter: Sergii Koshel >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Sporadically see exception as below. And restart of task because of it. > {code:title=Exception|borderStyle=solid} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3 > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) > at > org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) > at > org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779) > ... 8 more > {code} > File actually exists on S3. > I suppose it is related to some race conditions with S3 but would be good to > retry a few times before stop task execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4554) Add support for array types
[ https://issues.apache.org/jira/browse/FLINK-4554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526259#comment-15526259 ] Timo Walther commented on FLINK-4554: - [~kirill-morozov-epam] thanks for your interest. Unfortunately, I already started with this issue and will open a PR soon. Maybe you can pick something else. There are many other Table API/SQL related issues open. > Add support for array types > --- > > Key: FLINK-4554 > URL: https://issues.apache.org/jira/browse/FLINK-4554 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Support creating arrays: > {code}ARRAY[1, 2, 3]{code} > Access array values: > {code}myArray[3]{code} > And operations like: > {{UNNEST, UNNEST WITH ORDINALITY, CARDINALITY}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting
[ https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526248#comment-15526248 ] ASF GitHub Bot commented on FLINK-4218: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2544 > Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." > causes task restarting > -- > > Key: FLINK-4218 > URL: https://issues.apache.org/jira/browse/FLINK-4218 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.1.0 >Reporter: Sergii Koshel >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Sporadically see exception as below. And restart of task because of it. > {code:title=Exception|borderStyle=solid} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3 > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) > at > org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) > at > org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779) > ... 8 more > {code} > File actually exists on S3. > I suppose it is related to some race conditions with S3 but would be good to > retry a few times before stop task execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2555: [FLINK-4695] Introduce MetricRegistryConfiguration to enc...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2555 True, the benefit of this is only marginal in the current master. It is actually part of the `TaskManager` services start up logic in Flip-6. In this branch, the services will be generated by providing component specific configurations objects. These objects are generated from the global configuration and only contain component specific fields. That way, we decouple components from the global configuration object. In order to avoid future merge conflicts between Flip-6 and the master branch, I wanted to merge this specific commit to the master. I should have stated this earlier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4695) Separate configuration parsing from MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526182#comment-15526182 ] ASF GitHub Bot commented on FLINK-4695: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2555 True, the benefit of this is only marginal in the current master. It is actually part of the `TaskManager` services start up logic in Flip-6. In this branch, the services will be generated by providing component specific configurations objects. These objects are generated from the global configuration and only contain component specific fields. That way, we decouple components from the global configuration object. In order to avoid future merge conflicts between Flip-6 and the master branch, I wanted to merge this specific commit to the master. I should have stated this earlier. > Separate configuration parsing from MetricRegistry > -- > > Key: FLINK-4695 > URL: https://issues.apache.org/jira/browse/FLINK-4695 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to decouple the {{MetricRegistry}} object instantiation from the > global configuration, we could introduce a {{MetricRegistryConfiguration}} > object which encapsulates all necessary information for the > {{MetricRegistry}}. The {{MetricRegistryConfiguration}} could have a static > method to be generated from a {{Configuration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)