xiaoyuyao commented on a change in pull request #1049:
URL: https://github.com/apache/hadoop-ozone/pull/1049#discussion_r439132593



##########
File path: 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
##########
@@ -310,102 +315,69 @@ public void openPipeline(PipelineID pipelineId) throws 
IOException {
   }
 
   /**
-   * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
-   * destroy pipeline on the datanodes immediately or after timeout based on 
the
-   * value of onTimeout parameter.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param onTimeout       - if true pipeline is removed and destroyed on
-   *                        datanodes after timeout
-   * @throws IOException
-   */
-  @Override
-  public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
-      throws IOException {
-    LOG.info("Destroying pipeline:{}", pipeline);
-    finalizePipeline(pipeline.getId());
-    if (onTimeout) {
-      long pipelineDestroyTimeoutInMillis =
-          
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
-              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
-              TimeUnit.MILLISECONDS);
-      scheduler.schedule(() -> destroyPipeline(pipeline),
-          pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
-          String.format("Destroy pipeline failed for pipeline:%s", pipeline));
-    } else {
-      destroyPipeline(pipeline);
-    }
-  }
-
-  /**
-   * Moves the pipeline to CLOSED state and sends close container command for
-   * all the containers in the pipeline.
+   * Removes the pipeline from the db and pipeline state map.
    *
-   * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+   * @param pipeline - pipeline to be removed
    * @throws IOException
    */
-  private void finalizePipeline(PipelineID pipelineId) throws IOException {
+  protected void removePipeline(Pipeline pipeline) throws IOException {
+    pipelineFactory.close(pipeline.getType(), pipeline);
+    PipelineID pipelineID = pipeline.getId();
+    closeContainersForPipeline(pipelineID);
     lock.writeLock().lock();
     try {
-      Pipeline pipeline = stateManager.getPipeline(pipelineId);
-      if (!pipeline.isClosed()) {
-        stateManager.updatePipelineState(pipelineId.getProtobuf(),
-            HddsProtos.PipelineState.PIPELINE_CLOSED);
-        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
-      }
-
-      // TODO fire events to datanodes for closing pipelines
-//      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-//      for (ContainerID containerID : containerIDs) {
-//        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-//      }
-      metrics.removePipelineMetrics(pipelineId);
+      stateManager.removePipeline(pipelineID.getProtobuf());
+      metrics.incNumPipelineDestroyed();
+    } catch (IOException ex) {
+      metrics.incNumPipelineDestroyFailed();
+      throw ex;
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes for ratis pipelines.
-   *
-   * @param pipeline        - Pipeline to be destroyed
+   * Fire events to close all containers related to the input pipeline.
+   * @param pipelineId - ID of the pipeline.
    * @throws IOException
    */
-  protected void destroyPipeline(Pipeline pipeline) throws IOException {
-    pipelineFactory.close(pipeline.getType(), pipeline);
-    // remove the pipeline from the pipeline manager
-    removePipeline(pipeline.getId());
-    triggerPipelineCreation();
+  protected void closeContainersForPipeline(final PipelineID pipelineId)
+      throws IOException {
+    Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+    for (ContainerID containerID : containerIDs) {
+      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+    }
   }
 
   /**
-   * Removes the pipeline from the db and pipeline state map.
-   *
-   * @param pipelineId - ID of the pipeline to be removed
+   * put pipeline in CLOSED state.
+   * @param pipeline - ID of the pipeline.
+   * @param onTimeout - whether to remove pipeline after some time.
    * @throws IOException
    */
-  protected void removePipeline(PipelineID pipelineId) throws IOException {
+  @Override
+  public void closePipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException {
+    PipelineID pipelineID = pipeline.getId();
     lock.writeLock().lock();
     try {
-      stateManager.removePipeline(pipelineId.getProtobuf());
-      metrics.incNumPipelineDestroyed();
-    } catch (IOException ex) {
-      metrics.incNumPipelineDestroyFailed();
-      throw ex;
+      if (!pipeline.isClosed()) {
+        stateManager.updatePipelineState(pipelineID.getProtobuf(),
+            HddsProtos.PipelineState.PIPELINE_CLOSED);
+        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+      }
+      metrics.removePipelineMetrics(pipelineID);
     } finally {
       lock.writeLock().unlock();
     }
+    if (!onTimeout) {
+      removePipeline(pipeline);
+    }
   }
 
-  @Override
-  public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
-      throws IOException{
-    if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
-      // Only srub pipeline for RATIS THREE pipeline
-      return;
-    }
-    Instant currentTime = Instant.now();
+  private void scrubAllocatedPipeline(

Review comment:
       is it possible to dudup the code between scrub closed and allocated 
pipeline?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to