[FLINK-4537] rebase and refine
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34a6854b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34a6854b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34a6854b Branch: refs/heads/flip-6 Commit: 34a6854b44db8e949a02700deff18475d023b7bc Parents: efc7de5 Author: Maximilian Michels <m...@apache.org> Authored: Wed Sep 21 14:13:12 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:40 2016 +0200 ---------------------------------------------------------------------- .../resourcemanager/JobMasterRegistration.java | 52 +++--- .../resourcemanager/ResourceManager.java | 165 ++++++++----------- .../slotmanager/SlotManager.java | 29 +++- 3 files changed, 110 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/34a6854b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java index 7b8ec70..981441f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java @@ -18,59 +18,47 @@ package org.apache.flink.runtime.resourcemanager; -<<<<<<< HEAD import org.apache.flink.api.common.JobID; -======= import org.apache.flink.runtime.jobmaster.JobMasterGateway; ->>>>>>> db98efb... rsourceManager registration with JobManager +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import java.io.Serializable; import java.util.UUID; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master */ -public class JobMasterRegistration implements Serializable { +public class JobMasterRegistration implements LeaderRetrievalListener { -<<<<<<< HEAD - private final String address; + private final JobMasterGateway gateway; private final JobID jobID; + private final UUID leaderSessionID; + private LeaderRetrievalListener retriever; - public JobMasterRegistration(String address, JobID jobID) { - this.address = address; + public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) { + this.gateway = gateway; this.jobID = jobID; -======= - private static final long serialVersionUID = -2316627821716999527L; - - private final JobMasterGateway jobMasterGateway; - - private UUID jobMasterLeaderSessionID; - - public JobMasterRegistration(JobMasterGateway jobMasterGateway) { - this.jobMasterGateway = checkNotNull(jobMasterGateway); + this.leaderSessionID = leaderSessionID; } - public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) { - this.jobMasterGateway = checkNotNull(jobMasterGateway); - this.jobMasterLeaderSessionID = jobMasterLeaderSessionID; + public JobMasterGateway getGateway() { + return gateway; } - public JobMasterGateway getJobMasterGateway() { - return jobMasterGateway; + public UUID getLeaderSessionID() { + return leaderSessionID; } - public void setJobMasterLeaderSessionID(UUID leaderSessionID) { - this.jobMasterLeaderSessionID = jobMasterLeaderSessionID; ->>>>>>> db98efb... rsourceManager registration with JobManager + public JobID getJobID() { + return jobID; } - public UUID getJobMasterLeaderSessionID() { - return jobMasterLeaderSessionID; + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + } - public JobID getJobID() { - return jobID; + @Override + public void handleError(Exception exception) { + } } http://git-wip-us.apache.org/repos/asf/flink/blob/34a6854b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 8be1455..aae4874 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.resourcemanager; -import akka.dispatch.Futures; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -29,26 +28,31 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; -<<<<<<< HEAD -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -======= import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; ->>>>>>> db98efb... rsourceManager registration with JobManager +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.concurrent.Future; + +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,17 +66,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> * </ul> */ -<<<<<<< HEAD public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender { private final Logger LOG = LoggerFactory.getLogger(getClass()); private final Map<JobID, JobMasterGateway> jobMasterGateways; -======= -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { - /** the mapping relationship of JobID and JobMasterGateway */ - private final Map<JobID, JobMasterRegistration> jobMasters; ->>>>>>> db98efb... rsourceManager registration with JobManager + + private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners; private final HighAvailabilityServices highAvailabilityServices; @@ -88,12 +88,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { SlotManager slotManager) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); -<<<<<<< HEAD this.jobMasterGateways = new HashMap<>(); this.slotManager = slotManager; -======= - this.jobMasters = new HashMap<>(16); ->>>>>>> db98efb... rsourceManager registration with JobManager + this.jobMasterLeaderRetrievalListeners = new HashSet<>(); } @Override @@ -113,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { public void shutDown() { try { leaderElectionService.stop(); - for(JobID jobID : jobMasters.keySet()) { + for(JobID jobID : jobMasterGateways.keySet()) { highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); } super.shutDown(); @@ -142,52 +139,64 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { * @return Future registration response */ @RpcMethod -<<<<<<< HEAD - public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) { - final Future<JobMasterGateway> jobMasterFuture = - getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); - final JobID jobID = jobMasterRegistration.getJobID(); -======= - public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) { + public Future<RegistrationResponse> registerJobMaster( + final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, + final String jobMasterAddress, final JobID jobID) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobMasterAddress); + checkNotNull(jobID); + + // TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread + // The state updates should be performed inside the main thread + + final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>(); if(!leaderSessionID.equals(resourceManagerLeaderId)) { - log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" + + " did not equal the received leader session ID {}", jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId); - return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); + future.complete(new RegistrationResponse.Decline("Invalid leader session id")); + return future; } - Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class); ->>>>>>> db98efb... rsourceManager registration with JobManager + final LeaderConnectionInfo jobMasterLeaderInfo; + try { + jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( + highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); + } catch (Exception e) { + LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); + return future; + } + + if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) { + LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress); + future.complete(new RegistrationResponse.Decline("JobManager is not leading")); + return future; + } - return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() { + Future<JobMasterGateway> jobMasterGatewayFuture = + getRpcService().connect(jobMasterAddress, JobMasterGateway.class); + + return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() { @Override public RegistrationResponse apply(JobMasterGateway jobMasterGateway) { -<<<<<<< HEAD + + final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); + try { + LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); + jobMasterLeaderRetriever.start(jobMasterLeaderListener); + } catch (Exception e) { + LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); + } + jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener); final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); if (existingGateway != null) { - LOG.info("Replacing existing gateway {} for JobID {} with {}.", - existingGateway, jobID, jobMasterGateway); - } - return new RegistrationResponse(true); -======= - if (jobMasters.containsKey(jobID)) { - JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID()); - jobMasters.put(jobID, jobMasterRegistration); log.info("Replacing gateway for registered JobID {}.", jobID); - } else { - JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway); - jobMasters.put(jobID, jobMasterRegistration); - try { - highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID)); - } catch(Throwable e) { - log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster", - jobID, jobMasterAddress); - return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster"); - } } - return new JobMasterRegistrationSuccess(5000); ->>>>>>> db98efb... rsourceManager registration with JobManager } }, getMainThreadExecutor()); } @@ -228,26 +237,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { } -<<<<<<< HEAD // ------------------------------------------------------------------------ // Leader Contender // ------------------------------------------------------------------------ -======= - /** - * Callback method when current resourceManager lose leadership. - */ - @Override - public void revokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasters.clear(); - leaderSessionID = null; - } - }); - } ->>>>>>> db98efb... rsourceManager registration with JobManager /** * Callback method when current resourceManager is granted leadership @@ -263,7 +255,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { // confirming the leader session ID might be blocking, leaderElectionService.confirmLeaderSessionID(leaderSessionID); // notify SlotManager - slotManager.notifyLeaderAddress(getAddress(), leaderSessionID); + slotManager.setLeaderUUID(leaderSessionID); ResourceManager.this.leaderSessionID = leaderSessionID; } }); @@ -279,7 +271,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { public void run() { log.info("ResourceManager {} was revoked leadership.", getAddress()); jobMasterGateways.clear(); - ResourceManager.this.leaderSessionID = null; + slotManager.clearState(); + leaderSessionID = null; } }); } @@ -291,20 +284,15 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { */ @Override public void handleError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("ResourceManager received an error from the LeaderElectionService.", exception); - // notify SlotManager - slotManager.handleError(exception); - // terminate ResourceManager in case of an error - shutDown(); - } - }); + log.error("ResourceManager received an error from the LeaderElectionService.", exception); + // terminate ResourceManager in case of an error + shutDown(); } - private class JobMasterLeaderListener implements LeaderRetrievalListener { + private static class JobMasterLeaderListener implements LeaderRetrievalListener { + private final JobID jobID; + private UUID leaderID; private JobMasterLeaderListener(JobID jobID) { this.jobID = jobID; @@ -312,25 +300,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID); - // update job master leader session id - JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID); - jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID); - } - }); + this.leaderID = leaderSessionID; } @Override public void handleError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception); - } - }); + // TODO } } } http://git-wip-us.apache.org/repos/asf/flink/blob/34a6854b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 97176b2..5d0013c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * </ul> * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. */ -public abstract class SlotManager implements LeaderRetrievalListener { +public abstract class SlotManager { protected final Logger LOG = LoggerFactory.getLogger(getClass()); @@ -514,22 +514,33 @@ public abstract class SlotManager implements LeaderRetrievalListener { public int size() { return allocatedSlots.size(); } + + public void clear() { + allocatedSlots.clear(); + allocatedSlotsByAllocationId.clear(); + } + } + + /** + * Clears the state of the SlotManager after leadership revokal + */ + public void clearState() { + taskManagerGateways.clear(); + registeredSlots.clear(); + pendingSlotRequests.clear(); + freeSlots.clear(); + allocationMap.clear(); + leaderID = null; } // ------------------------------------------------------------------------ - // High availability + // High availability (called by the ResourceManager) // ------------------------------------------------------------------------ - @Override - public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + public void setLeaderUUID(UUID leaderSessionID) { this.leaderID = leaderSessionID; } - @Override - public void handleError(Exception exception) { - LOG.error("Slot Manager received an error from the leader service", exception); - } - // ------------------------------------------------------------------------ // Testing utilities // ------------------------------------------------------------------------