http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java new file mode 100644 index 0000000..896421b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager. + */ +public class SlotRequest implements Serializable { + + private static final long serialVersionUID = -6586877187990445986L; + + /** The JobID of the slot requested for */ + private final JobID jobId; + + /** The unique identification of this request */ + private final AllocationID allocationId; + + /** The resource profile of the required slot */ + private final ResourceProfile resourceProfile; + + public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) { + this.jobId = checkNotNull(jobId); + this.allocationId = checkNotNull(allocationId); + this.resourceProfile = checkNotNull(resourceProfile); + } + + /** + * Get the JobID of the slot requested for. + * @return The job id + */ + public JobID getJobId() { + return jobId; + } + + /** + * Get the unique identification of this request + * @return the allocation id + */ + public AllocationID getAllocationId() { + return allocationId; + } + + /** + * Get the resource profile of the desired slot + * @return The resource profile + */ + public ResourceProfile getResourceProfile() { + return resourceProfile; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java deleted file mode 100644 index a046cb8..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.jobmaster; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.leaderelection.LeaderContender; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.util.Preconditions; - -import java.util.UUID; - -/** - * JobMaster implementation. The job master is responsible for the execution of a single - * {@link org.apache.flink.runtime.jobgraph.JobGraph}. - * <p> - * It offers the following methods as part of its rpc interface to interact with the JobMaster - * remotely: - * <ul> - * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for - * given task</li> - * </ul> - */ -public class JobMaster extends RpcEndpoint<JobMasterGateway> { - - /** Gateway to connected resource manager, null iff not connected */ - private ResourceManagerGateway resourceManager = null; - - /** Logical representation of the job */ - private final JobGraph jobGraph; - private final JobID jobID; - - /** Configuration of the job */ - private final Configuration configuration; - - /** Service to contend for and retrieve the leadership of JM and RM */ - private final HighAvailabilityServices highAvailabilityServices; - - /** Leader Management */ - private LeaderElectionService leaderElectionService = null; - private UUID leaderSessionID; - - /** - * The JM's Constructor - * - * @param jobGraph The representation of the job's execution plan - * @param configuration The job's configuration - * @param rpcService The RPC service at which the JM serves - * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders. - */ - public JobMaster( - JobGraph jobGraph, - Configuration configuration, - RpcService rpcService, - HighAvailabilityServices highAvailabilityService) { - - super(rpcService); - - this.jobGraph = Preconditions.checkNotNull(jobGraph); - this.jobID = Preconditions.checkNotNull(jobGraph.getJobID()); - - this.configuration = Preconditions.checkNotNull(configuration); - - this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService); - } - - public ResourceManagerGateway getResourceManager() { - return resourceManager; - } - - //---------------------------------------------------------------------------------------------- - // Initialization methods - //---------------------------------------------------------------------------------------------- - public void start() { - super.start(); - - // register at the election once the JM starts - registerAtElectionService(); - } - - - //---------------------------------------------------------------------------------------------- - // JobMaster Leadership methods - //---------------------------------------------------------------------------------------------- - - /** - * Retrieves the election service and contend for the leadership. - */ - private void registerAtElectionService() { - try { - leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID); - leaderElectionService.start(new JobMasterLeaderContender()); - } catch (Exception e) { - throw new RuntimeException("Fail to register at the election of JobMaster", e); - } - } - - /** - * Start the execution when the leadership is granted. - * - * @param newLeaderSessionID The identifier of the new leadership session - */ - public void grantJobMasterLeadership(final UUID newLeaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID); - - // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that - // JM waits here for the operation's completeness. - leaderSessionID = newLeaderSessionID; - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); - - // TODO:: execute the job when the leadership is granted. - } - }); - } - - /** - * Stop the execution when the leadership is revoked. - */ - public void revokeJobMasterLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("JobManager {} was revoked leadership.", getAddress()); - - // TODO:: cancel the job's execution and notify all listeners - cancelAndClearEverything(new Exception("JobManager is no longer the leader.")); - - leaderSessionID = null; - } - }); - } - - /** - * Handles error occurring in the leader election service - * - * @param exception Exception thrown in the leader election service - */ - public void onJobMasterElectionError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("Received an error from the LeaderElectionService.", exception); - - // TODO:: cancel the job's execution and shutdown the JM - cancelAndClearEverything(exception); - - leaderSessionID = null; - } - }); - - } - - //---------------------------------------------------------------------------------------------- - // RPC methods - //---------------------------------------------------------------------------------------------- - - /** - * Updates the task execution state for a given task. - * - * @param taskExecutionState New task execution state for a given task - * @return Acknowledge the task execution state update - */ - @RpcMethod - public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) { - System.out.println("TaskExecutionState: " + taskExecutionState); - return Acknowledge.get(); - } - - /** - * Triggers the registration of the job master at the resource manager. - * - * @param address Address of the resource manager - */ - @RpcMethod - public void registerAtResourceManager(final String address) { - //TODO:: register at the RM - } - - //---------------------------------------------------------------------------------------------- - // Helper methods - //---------------------------------------------------------------------------------------------- - - /** - * Cancel the current job and notify all listeners the job's cancellation. - * - * @param cause Cause for the cancelling. - */ - private void cancelAndClearEverything(Throwable cause) { - // currently, nothing to do here - } - - // ------------------------------------------------------------------------ - // Utility classes - // ------------------------------------------------------------------------ - private class JobMasterLeaderContender implements LeaderContender { - - @Override - public void grantLeadership(UUID leaderSessionID) { - JobMaster.this.grantJobMasterLeadership(leaderSessionID); - } - - @Override - public void revokeLeadership() { - JobMaster.this.revokeJobMasterLeadership(); - } - - @Override - public String getAddress() { - return JobMaster.this.getAddress(); - } - - @Override - public void handleError(Exception exception) { - onJobMasterElectionError(exception); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java deleted file mode 100644 index 17a4c3a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.jobmaster; - -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import scala.concurrent.Future; - -/** - * {@link JobMaster} rpc gateway interface - */ -public interface JobMasterGateway extends RpcGateway { - - /** - * Updates the task execution state for a given task. - * - * @param taskExecutionState New task execution state for a given task - * @return Future acknowledge of the task execution state update - */ - Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState); - - /** - * Triggers the registration of the job master at the resource manager. - * - * @param address Address of the resource manager - */ - void registerAtResourceManager(final String address); -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java deleted file mode 100644 index 2de560a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.registration; - -import java.io.Serializable; - -/** - * Base class for responses given to registration attempts from {@link RetryingRegistration}. - */ -public abstract class RegistrationResponse implements Serializable { - - private static final long serialVersionUID = 1L; - - // ---------------------------------------------------------------------------- - - /** - * Base class for a successful registration. Concrete registration implementations - * will typically extend this class to attach more information. - */ - public static class Success extends RegistrationResponse { - private static final long serialVersionUID = 1L; - - @Override - public String toString() { - return "Registration Successful"; - } - } - - // ---------------------------------------------------------------------------- - - /** - * A rejected (declined) registration. - */ - public static final class Decline extends RegistrationResponse { - private static final long serialVersionUID = 1L; - - /** the rejection reason */ - private final String reason; - - /** - * Creates a new rejection message. - * - * @param reason The reason for the rejection. - */ - public Decline(String reason) { - this.reason = reason != null ? reason : "(unknown)"; - } - - /** - * Gets the reason for the rejection. - */ - public String getReason() { - return reason; - } - - @Override - public String toString() { - return "Registration Declined (" + reason + ')'; - } - } -} - - - - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java deleted file mode 100644 index dcb5011..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.registration; - -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcService; - -import org.slf4j.Logger; - -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.impl.Promise.DefaultPromise; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - - -/** - * This utility class implements the basis of registering one component at another component, - * for example registering the TaskExecutor at the ResourceManager. - * This {@code RetryingRegistration} implements both the initial address resolution - * and the retries-with-backoff strategy. - * - * <p>The registration gives access to a future that is completed upon successful registration. - * The registration can be canceled, for example when the target where it tries to register - * at looses leader status. - * - * @param <Gateway> The type of the gateway to connect to. - * @param <Success> The type of the successful registration responses. - */ -public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> { - - // ------------------------------------------------------------------------ - // default configuration values - // ------------------------------------------------------------------------ - - /** default value for the initial registration timeout (milliseconds) */ - private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100; - - /** default value for the maximum registration timeout, after exponential back-off (milliseconds) */ - private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000; - - /** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout) */ - private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000; - - /** The pause (milliseconds) made after the registration attempt was refused */ - private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000; - - // ------------------------------------------------------------------------ - // Fields - // ------------------------------------------------------------------------ - - private final Logger log; - - private final RpcService rpcService; - - private final String targetName; - - private final Class<Gateway> targetType; - - private final String targetAddress; - - private final UUID leaderId; - - private final Promise<Tuple2<Gateway, Success>> completionPromise; - - private final long initialRegistrationTimeout; - - private final long maxRegistrationTimeout; - - private final long delayOnError; - - private final long delayOnRefusedRegistration; - - private volatile boolean canceled; - - // ------------------------------------------------------------------------ - - public RetryingRegistration( - Logger log, - RpcService rpcService, - String targetName, - Class<Gateway> targetType, - String targetAddress, - UUID leaderId) { - this(log, rpcService, targetName, targetType, targetAddress, leaderId, - INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS, - ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS); - } - - public RetryingRegistration( - Logger log, - RpcService rpcService, - String targetName, - Class<Gateway> targetType, - String targetAddress, - UUID leaderId, - long initialRegistrationTimeout, - long maxRegistrationTimeout, - long delayOnError, - long delayOnRefusedRegistration) { - - checkArgument(initialRegistrationTimeout > 0, "initial registration timeout must be greater than zero"); - checkArgument(maxRegistrationTimeout > 0, "maximum registration timeout must be greater than zero"); - checkArgument(delayOnError >= 0, "delay on error must be non-negative"); - checkArgument(delayOnRefusedRegistration >= 0, "delay on refused registration must be non-negative"); - - this.log = checkNotNull(log); - this.rpcService = checkNotNull(rpcService); - this.targetName = checkNotNull(targetName); - this.targetType = checkNotNull(targetType); - this.targetAddress = checkNotNull(targetAddress); - this.leaderId = checkNotNull(leaderId); - this.initialRegistrationTimeout = initialRegistrationTimeout; - this.maxRegistrationTimeout = maxRegistrationTimeout; - this.delayOnError = delayOnError; - this.delayOnRefusedRegistration = delayOnRefusedRegistration; - - this.completionPromise = new DefaultPromise<>(); - } - - // ------------------------------------------------------------------------ - // completion and cancellation - // ------------------------------------------------------------------------ - - public Future<Tuple2<Gateway, Success>> getFuture() { - return completionPromise.future(); - } - - /** - * Cancels the registration procedure. - */ - public void cancel() { - canceled = true; - } - - /** - * Checks if the registration was canceled. - * @return True if the registration was canceled, false otherwise. - */ - public boolean isCanceled() { - return canceled; - } - - // ------------------------------------------------------------------------ - // registration - // ------------------------------------------------------------------------ - - protected abstract Future<RegistrationResponse> invokeRegistration( - Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception; - - /** - * This method resolves the target address to a callable gateway and starts the - * registration after that. - */ - @SuppressWarnings("unchecked") - public void startRegistration() { - try { - // trigger resolution of the resource manager address to a callable gateway - Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType); - - // upon success, start the registration attempts - resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() { - @Override - public void onSuccess(Gateway result) { - log.info("Resolved {} address, beginning registration", targetName); - register(result, 1, initialRegistrationTimeout); - } - }, rpcService.getExecutionContext()); - - // upon failure, retry, unless this is cancelled - resourceManagerFuture.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) { - if (!isCanceled()) { - log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress); - startRegistration(); - } - } - }, rpcService.getExecutionContext()); - } - catch (Throwable t) { - cancel(); - completionPromise.tryFailure(t); - } - } - - /** - * This method performs a registration attempt and triggers either a success notification or a retry, - * depending on the result. - */ - @SuppressWarnings("unchecked") - private void register(final Gateway gateway, final int attempt, final long timeoutMillis) { - // eager check for canceling to avoid some unnecessary work - if (canceled) { - return; - } - - try { - log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis); - Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); - - // if the registration was successful, let the TaskExecutor know - registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() { - - @Override - public void onSuccess(RegistrationResponse result) throws Throwable { - if (!isCanceled()) { - if (result instanceof RegistrationResponse.Success) { - // registration successful! - Success success = (Success) result; - completionPromise.success(new Tuple2<>(gateway, success)); - } - else { - // registration refused or unknown - if (result instanceof RegistrationResponse.Decline) { - RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result; - log.info("Registration at {} was declined: {}", targetName, decline.getReason()); - } else { - log.error("Received unknown response to registration attempt: " + result); - } - - log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration); - registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration); - } - } - } - }, rpcService.getExecutionContext()); - - // upon failure, retry - registrationFuture.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) { - if (!isCanceled()) { - if (failure instanceof TimeoutException) { - // we simply have not received a response in time. maybe the timeout was - // very low (initial fast registration attempts), maybe the target endpoint is - // currently down. - if (log.isDebugEnabled()) { - log.debug("Registration at {} ({}) attempt {} timed out after {} ms", - targetName, targetAddress, attempt, timeoutMillis); - } - - long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout); - register(gateway, attempt + 1, newTimeoutMillis); - } - else { - // a serious failure occurred. we still should not give up, but keep trying - log.error("Registration at " + targetName + " failed due to an error", failure); - log.info("Pausing and re-attempting registration in {} ms", delayOnError); - - registerLater(gateway, 1, initialRegistrationTimeout, delayOnError); - } - } - } - }, rpcService.getExecutionContext()); - } - catch (Throwable t) { - cancel(); - completionPromise.tryFailure(t); - } - } - - private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) { - rpcService.scheduleRunnable(new Runnable() { - @Override - public void run() { - register(gateway, attempt, timeoutMillis); - } - }, delay, TimeUnit.MILLISECONDS); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java deleted file mode 100644 index 7a2deae..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import java.io.Serializable; - -public class JobMasterRegistration implements Serializable { - private static final long serialVersionUID = 8411214999193765202L; - - private final String address; - - public JobMasterRegistration(String address) { - this.address = address; - } - - public String getAddress() { - return address; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java deleted file mode 100644 index 8ac9e49..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import org.apache.flink.runtime.instance.InstanceID; - -import java.io.Serializable; - -public class RegistrationResponse implements Serializable { - private static final long serialVersionUID = -2379003255993119993L; - - private final boolean isSuccess; - private final InstanceID instanceID; - - public RegistrationResponse(boolean isSuccess, InstanceID instanceID) { - this.isSuccess = isSuccess; - this.instanceID = instanceID; - } - - public boolean isSuccess() { - return isSuccess; - } - - public InstanceID getInstanceID() { - return instanceID; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java deleted file mode 100644 index f7147c9..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import akka.dispatch.Mapper; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.leaderelection.LeaderContender; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.jobmaster.JobMaster; -import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess; - -import scala.concurrent.Future; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * ResourceManager implementation. The resource manager is responsible for resource de-/allocation - * and bookkeeping. - * - * It offers the following methods as part of its rpc interface to interact with the him remotely: - * <ul> - * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li> - * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> - * </ul> - */ -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { - private final Map<JobMasterGateway, InstanceID> jobMasterGateways; - private final HighAvailabilityServices highAvailabilityServices; - private LeaderElectionService leaderElectionService = null; - private UUID leaderSessionID = null; - - public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) { - super(rpcService); - this.highAvailabilityServices = checkNotNull(highAvailabilityServices); - this.jobMasterGateways = new HashMap<>(); - } - - @Override - public void start() { - // start a leader - try { - super.start(); - leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); - leaderElectionService.start(new ResourceManagerLeaderContender()); - } catch (Throwable e) { - log.error("A fatal error happened when starting the ResourceManager", e); - throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); - } - } - - @Override - public void shutDown() { - try { - leaderElectionService.stop(); - super.shutDown(); - } catch(Throwable e) { - log.error("A fatal error happened when shutdown the ResourceManager", e); - throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e); - } - } - - /** - * Gets the leader session id of current resourceManager. - * - * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. - */ - @VisibleForTesting - UUID getLeaderSessionID() { - return leaderSessionID; - } - - /** - * Register a {@link JobMaster} at the resource manager. - * - * @param jobMasterRegistration Job master registration information - * @return Future registration response - */ - @RpcMethod - public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) { - Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); - - return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() { - @Override - public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { - InstanceID instanceID; - - if (jobMasterGateways.containsKey(jobMasterGateway)) { - instanceID = jobMasterGateways.get(jobMasterGateway); - } else { - instanceID = new InstanceID(); - jobMasterGateways.put(jobMasterGateway, instanceID); - } - - return new RegistrationResponse(true, instanceID); - } - }, getMainThreadExecutionContext()); - } - - /** - * Requests a slot from the resource manager. - * - * @param slotRequest Slot request - * @return Slot assignment - */ - @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); - } - - - /** - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * - * @return The response by the ResourceManager. - */ - @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { - - return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); - } - - private class ResourceManagerLeaderContender implements LeaderContender { - - /** - * Callback method when current resourceManager is granted leadership - * - * @param leaderSessionID unique leadershipID - */ - @Override - public void grantLeadership(final UUID leaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); - ResourceManager.this.leaderSessionID = leaderSessionID; - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(leaderSessionID); - } - }); - } - - /** - * Callback method when current resourceManager lose leadership. - */ - @Override - public void revokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasterGateways.clear(); - leaderSessionID = null; - } - }); - } - - @Override - public String getAddress() { - return ResourceManager.this.getAddress(); - } - - /** - * Handles error occurring in the leader election service - * - * @param exception Exception being thrown in the leader election service - */ - @Override - public void handleError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("ResourceManager received an error from the LeaderElectionService.", exception); - // terminate ResourceManager in case of an error - shutDown(); - } - }); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java deleted file mode 100644 index afddb01..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcTimeout; -import org.apache.flink.runtime.rpc.jobmaster.JobMaster; - -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; - -/** - * The {@link ResourceManager}'s RPC gateway interface. - */ -public interface ResourceManagerGateway extends RpcGateway { - - /** - * Register a {@link JobMaster} at the resource manager. - * - * @param jobMasterRegistration Job master registration information - * @param timeout Timeout for the future to complete - * @return Future registration response - */ - Future<RegistrationResponse> registerJobMaster( - JobMasterRegistration jobMasterRegistration, - @RpcTimeout FiniteDuration timeout); - - /** - * Register a {@link JobMaster} at the resource manager. - * - * @param jobMasterRegistration Job master registration information - * @return Future registration response - */ - Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration); - - /** - * Requests a slot from the resource manager. - * - * @param slotRequest Slot request - * @return Future slot assignment - */ - Future<SlotAssignment> requestSlot(SlotRequest slotRequest); - - /** - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * @param timeout The timeout for the response. - * - * @return The future to the response by the ResourceManager. - */ - Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID, - @RpcTimeout FiniteDuration timeout); -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java deleted file mode 100644 index 86cd8b7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import java.io.Serializable; - -public class SlotAssignment implements Serializable{ - private static final long serialVersionUID = -6990813455942742322L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java deleted file mode 100644 index 74c7c39..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; - -import java.io.Serializable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager. - */ -public class SlotRequest implements Serializable { - - private static final long serialVersionUID = -6586877187990445986L; - - /** The JobID of the slot requested for */ - private final JobID jobId; - - /** The unique identification of this request */ - private final AllocationID allocationId; - - /** The resource profile of the required slot */ - private final ResourceProfile resourceProfile; - - public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) { - this.jobId = checkNotNull(jobId); - this.allocationId = checkNotNull(allocationId); - this.resourceProfile = checkNotNull(resourceProfile); - } - - /** - * Get the JobID of the slot requested for. - * @return The job id - */ - public JobID getJobId() { - return jobId; - } - - /** - * Get the unique identification of this request - * @return the allocation id - */ - public AllocationID getAllocationId() { - return allocationId; - } - - /** - * Get the resource profile of the desired slot - * @return The resource profile - */ - public ResourceProfile getResourceProfile() { - return resourceProfile; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java deleted file mode 100644 index c372ecb..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; - -import java.io.Serializable; -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A report about the current status of all slots of the TaskExecutor, describing - * which slots are available and allocated, and what jobs (JobManagers) the allocated slots - * have been allocated to. - */ -public class SlotReport implements Serializable { - - private static final long serialVersionUID = -3150175198722481689L; - - /** The slots status of the TaskManager */ - private final List<SlotStatus> slotsStatus; - - /** The resource id which identifies the TaskManager */ - private final ResourceID resourceID; - - public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) { - this.slotsStatus = checkNotNull(slotsStatus); - this.resourceID = checkNotNull(resourceID); - } - - public List<SlotStatus> getSlotsStatus() { - return slotsStatus; - } - - public ResourceID getResourceID() { - return resourceID; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java deleted file mode 100644 index e8e2084..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotID; - -import java.io.Serializable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * This describes the slot current status which located in TaskManager. - */ -public class SlotStatus implements Serializable { - - private static final long serialVersionUID = 5099191707339664493L; - - /** slotID to identify a slot */ - private final SlotID slotID; - - /** the resource profile of the slot */ - private final ResourceProfile profiler; - - /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */ - private final AllocationID allocationID; - - /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ - private final JobID jobID; - - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); - } - - public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) { - this.slotID = checkNotNull(slotID, "slotID cannot be null"); - this.profiler = checkNotNull(profiler, "profile cannot be null"); - this.allocationID = allocationID; - this.jobID = jobID; - } - - /** - * Get the unique identification of this slot - * - * @return The slot id - */ - public SlotID getSlotID() { - return slotID; - } - - /** - * Get the resource profile of this slot - * - * @return The resource profile - */ - public ResourceProfile getProfiler() { - return profiler; - } - - /** - * Get the allocation id of this slot - * - * @return The allocation id if this slot is allocated, otherwise null - */ - public AllocationID getAllocationID() { - return allocationID; - } - - /** - * Get the job id of the slot allocated for - * - * @return The job id if this slot is allocated, otherwise null - */ - public JobID getJobID() { - return jobID; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SlotStatus that = (SlotStatus) o; - - if (!slotID.equals(that.slotID)) { - return false; - } - if (!profiler.equals(that.profiler)) { - return false; - } - if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) { - return false; - } - return jobID != null ? jobID.equals(that.jobID) : that.jobID == null; - - } - - @Override - public int hashCode() { - int result = slotID.hashCode(); - result = 31 * result + profiler.hashCode(); - result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0); - result = 31 * result + (jobID != null ? jobID.hashCode() : 0); - return result; - } - -}