Pochatkin commented on code in PR #1809:
URL: https://github.com/apache/ignite-3/pull/1809#discussion_r1144342707
##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -140,107 +116,76 @@ public CompletableFuture<Boolean> deployAsync(String id,
Version version, Deploy
Objects.requireNonNull(version);
Objects.requireNonNull(deploymentUnit);
- ByteArray key = key(id, version.render());
- UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(),
Collections.emptyList());
-
- Operation put = put(key, UnitMetaSerializer.serialize(meta));
-
- DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+ UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(),
DeploymentStatus.UPLOADING, Collections.emptyList());
+ byte[] unitContent;
try {
- builder.unitContent(deploymentUnit.content().readAllBytes());
+ unitContent = deploymentUnit.content().readAllBytes();
} catch (IOException e) {
LOG.error("Error to read deployment unit content", e);
return CompletableFuture.failedFuture(new
DeploymentUnitReadException(e));
}
- DeployUnitRequest request = builder
- .unitName(deploymentUnit.name())
- .id(id)
- .version(version.render())
- .build();
- return metaStorage.invoke(notExists(key), put, Operations.noop())
+ CompletableFuture<Boolean> result = metastore.putIfNotExist(id,
version, meta)
.thenCompose(success -> {
if (success) {
- return doDeploy(request);
+ return deployer.deploy(id, version.render(),
deploymentUnit.name(), unitContent);
}
LOG.error("Failed to deploy meta of unit " + id + ":" +
version);
return CompletableFuture.failedFuture(
new DeploymentUnitAlreadyExistsException(id,
"Unit " + id + ":" + version + " already
exists"));
})
+ .thenCompose(deployed -> {
+ if (deployed) {
+ return metastore.updateMeta(id, version,
+ meta1 ->
meta1.addConsistentId(clusterService.topologyService().localMember().name()));
+ }
+ return CompletableFuture.completedFuture(false);
+ })
.thenApply(completed -> {
if (completed) {
- startDeployAsyncToCmg(request);
+ messaging.startDeployAsyncToCmg(id, version,
deploymentUnit.name(), unitContent)
+ .thenAccept(v -> metastore.updateMeta(id,
version, meta1 -> meta1.updateStatus(DEPLOYED)));
}
return completed;
});
- }
-
- private void startDeployAsyncToCmg(DeployUnitRequest request) {
- cmgManager.cmgNodes()
- .thenAccept(nodes -> {
- for (String node : nodes) {
- ClusterNode clusterNode =
clusterService.topologyService().getByConsistentId(node);
- if (clusterNode != null) {
-
inFlightFutures.registerFuture(requestDeploy(clusterNode, request));
- }
- }
- });
- }
-
- private CompletableFuture<Boolean> requestDeploy(ClusterNode clusterNode,
DeployUnitRequest request) {
- return clusterService.messagingService()
- .invoke(clusterNode, request, Long.MAX_VALUE)
- .thenCompose(message -> {
- Throwable error = ((DeployUnitResponse) message).error();
- if (error != null) {
- LOG.error("Failed to deploy unit " + request.id() +
":" + request.version()
- + " to node " + clusterNode, error);
- return CompletableFuture.failedFuture(error);
- }
- return CompletableFuture.completedFuture(true);
- });
+ return tracker.track(id, version, result);
}
@Override
public CompletableFuture<Void> undeployAsync(String id, Version version) {
checkId(id);
Objects.requireNonNull(version);
- ByteArray key = key(id, version.render());
-
- return metaStorage.invoke(exists(key), Operations.remove(key),
Operations.noop())
+ return messaging.stopInProgressDeploy(id, version)
+ .thenCompose(v -> metastore.updateMeta(id, version, meta ->
meta.updateStatus(OBSOLETE)))
+ .thenCompose(success -> {
+ if (success) {
+ //TODO: Check unit usages here. If unit used in
compute task we cannot just remove it.
Review Comment:
We have no this ticket yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]