[FLINK-8634] [rest] Introduce job rescaling REST handler Add rescaling REST handler as a sub class of the AbstractAsynchronousOperationHandlers.
This closes #5451. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06acdc19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06acdc19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06acdc19 Branch: refs/heads/master Commit: 06acdc1907300862d5faddc4e882f5f6dd670edb Parents: 4756573 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Feb 2 11:06:35 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Feb 22 17:32:38 2018 +0100 ---------------------------------------------------------------------- .../async/AsynchronousOperationInfo.java | 64 +++++++++++ .../job/AsynchronousJobOperationKey.java | 74 +++++++++++++ .../rest/handler/job/RescalingHandlers.java | 111 +++++++++++++++++++ .../job/RescalingStatusMessageParameters.java | 39 +++++++ .../job/RescalingTriggerMessageParameters.java | 40 +++++++ .../job/savepoints/SavepointHandlers.java | 60 +--------- .../RescaleParallelismQueryParameter.java | 41 +++++++ .../RescalingParallelismQueryParameter.java | 41 +++++++ 8 files changed, 416 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java new file mode 100644 index 0000000..a46fba9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.async; + +import org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer; +import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +/** + * Basic information object for asynchronous operations. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AsynchronousOperationInfo { + + private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause"; + + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + @JsonSerialize(using = SerializedThrowableSerializer.class) + @Nullable + private final SerializedThrowable failureCause; + + private AsynchronousOperationInfo( + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + @JsonDeserialize(using = SerializedThrowableDeserializer.class) + @Nullable SerializedThrowable failureCause) { + this.failureCause = failureCause; + } + + @Nullable + public SerializedThrowable getFailureCause() { + return failureCause; + } + + public static AsynchronousOperationInfo completeExceptional(SerializedThrowable serializedThrowable) { + return new AsynchronousOperationInfo(serializedThrowable); + } + + public static AsynchronousOperationInfo complete() { + return new AsynchronousOperationInfo(null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java new file mode 100644 index 0000000..4bb473e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; +import org.apache.flink.runtime.rest.handler.async.OperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; + +import javax.annotation.concurrent.Immutable; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based + * collection. + * + * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache + */ +@Immutable +public class AsynchronousJobOperationKey extends OperationKey { + + private final JobID jobId; + + private AsynchronousJobOperationKey(final TriggerId triggerId, final JobID jobId) { + super(triggerId); + this.jobId = requireNonNull(jobId); + } + + public static AsynchronousJobOperationKey of(final TriggerId triggerId, final JobID jobId) { + return new AsynchronousJobOperationKey(triggerId, jobId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + + AsynchronousJobOperationKey that = (AsynchronousJobOperationKey) o; + return Objects.equals(jobId, that.jobId); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), jobId); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java new file mode 100644 index 0000000..6f8895a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RescaleParallelismQueryParameter; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Rest handler to trigger and poll the rescaling of a running job. + */ +public class RescalingHandlers extends AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> { + + /** + * Handler which triggers the rescaling of the specified job. + */ + public class RescalingTriggerHandler extends TriggerHandler<RestfulGateway, EmptyRequestBody, RescalingTriggerMessageParameters> { + + protected RescalingTriggerHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, TriggerResponse, RescalingTriggerMessageParameters> messageHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws RestHandlerException { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final List<Integer> queryParameter = request.getQueryParameter(RescaleParallelismQueryParameter.class); + + if (queryParameter.isEmpty()) { + throw new RestHandlerException("No new parallelism was specified.", HttpResponseStatus.BAD_REQUEST); + } + + final int newParallelism = queryParameter.get(0); + + final CompletableFuture<Acknowledge> rescalingFuture = gateway.rescaleJob(jobId, newParallelism, RescalingBehaviour.STRICT, timeout); + + return rescalingFuture; + } + + @Override + protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return AsynchronousJobOperationKey.of(new TriggerId(), jobId); + } + } + + /** + * Handler which reports the status of the rescaling operation. + */ + public class RescalingStatusHandler extends StatusHandler<RestfulGateway, AsynchronousOperationInfo, RescalingStatusMessageParameters> { + + protected RescalingStatusHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<AsynchronousOperationInfo>, RescalingStatusMessageParameters> messageHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, RescalingStatusMessageParameters> request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + + return AsynchronousJobOperationKey.of(triggerId, jobId); + } + + @Override + protected AsynchronousOperationInfo exceptionalOperationResultResponse(Throwable throwable) { + return AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(throwable)); + } + + @Override + protected AsynchronousOperationInfo operationResultResponse(Acknowledge operationResult) { + return AsynchronousOperationInfo.complete(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java new file mode 100644 index 0000000..4821b4f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}. + */ +public class RescalingStatusMessageParameters extends JobMessageParameters { + + public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Arrays.asList(jobPathParameter, triggerIdPathParameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java new file mode 100644 index 0000000..096baa1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * {@link MessageParameters} for triggering the rescaling of a job. + */ +public class RescalingTriggerMessageParameters extends JobMessageParameters { + + public final RescalingParallelismQueryParameter rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter(); + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.singleton(rescalingParallelismQueryParameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java index 5915d72..cb3ff5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java @@ -24,7 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; -import org.apache.flink.runtime.rest.handler.async.OperationKey; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.TriggerId; @@ -43,14 +43,10 @@ import org.apache.flink.util.SerializedThrowable; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import javax.annotation.Nullable; -import javax.annotation.concurrent.Immutable; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; -import static java.util.Objects.requireNonNull; - /** * HTTP handlers for asynchronous triggering of savepoints. * @@ -96,7 +92,7 @@ import static java.util.Objects.requireNonNull; * } * </pre> */ -public class SavepointHandlers extends AbstractAsynchronousOperationHandlers<SavepointHandlers.SavepointKey, String> { +public class SavepointHandlers extends AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, String> { @Nullable private final String defaultSavepointDir; @@ -136,9 +132,9 @@ public class SavepointHandlers extends AbstractAsynchronousOperationHandlers<Sav } @Override - protected SavepointKey createOperationKey(HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request) { + protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request) { final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return SavepointKey.of(new TriggerId(), jobId); + return AsynchronousJobOperationKey.of(new TriggerId(), jobId); } } @@ -156,10 +152,10 @@ public class SavepointHandlers extends AbstractAsynchronousOperationHandlers<Sav } @Override - protected SavepointKey getOperationKey(HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request) { + protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request) { final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return SavepointKey.of(triggerId, jobId); + return AsynchronousJobOperationKey.of(triggerId, jobId); } @Override @@ -172,48 +168,4 @@ public class SavepointHandlers extends AbstractAsynchronousOperationHandlers<Sav return new SavepointInfo(operationResult, null); } } - - /** - * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based - * collection. - * - * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache - */ - @Immutable - public static class SavepointKey extends OperationKey { - - private final JobID jobId; - - private SavepointKey(final TriggerId triggerId, final JobID jobId) { - super(triggerId); - this.jobId = requireNonNull(jobId); - } - - private static SavepointKey of(final TriggerId triggerId, final JobID jobId) { - return new SavepointKey(triggerId, jobId); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - if (!super.equals(o)) { - return false; - } - - SavepointKey that = (SavepointKey) o; - return Objects.equals(jobId, that.jobId); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), jobId); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java new file mode 100644 index 0000000..8058ab9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}. + */ +public class RescaleParallelismQueryParameter extends MessageQueryParameter<Integer> { + + public static final String KEY = "parallelism"; + + protected RescaleParallelismQueryParameter() { + super(KEY, MessageParameterRequisiteness.MANDATORY); + } + + @Override + public Integer convertValueFromString(String value) { + return Integer.valueOf(value); + } + + @Override + public String convertStringToValue(Integer value) { + return value.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java new file mode 100644 index 0000000..9230d79 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** + * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}. + */ +public class RescalingParallelismQueryParameter extends MessageQueryParameter<Integer> { + + public static final String KEY = "parallelism"; + + public RescalingParallelismQueryParameter() { + super(KEY, MessageParameterRequisiteness.MANDATORY); + } + + @Override + public Integer convertValueFromString(String value) { + return Integer.valueOf(value); + } + + @Override + public String convertStringToValue(Integer value) { + return value.toString(); + } +}