This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f36aee  Rest endpoint to query compaction status (#1501)
6f36aee is described below

commit 6f36aeea72178014d5aed980f38585726ca588d7
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Apr 6 23:12:17 2018 +0200

    Rest endpoint to query compaction status (#1501)
    
    Currently returns whether compaction has not run, is running, is
    complete or has failed. If it has failed, the last error is returned
    also.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  7 +++
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 14 ++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 15 +++++++
 .../broker/service/persistent/PersistentTopic.java | 24 ++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 36 +++++++++++++++
 .../pulsar/client/admin/PersistentTopics.java      |  8 ++++
 .../admin/internal/PersistentTopicsImpl.java       | 13 ++++++
 .../pulsar/common/compaction/CompactionStatus.java | 52 ++++++++++++++++++++++
 8 files changed, 169 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aa52c64..03a435d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -1086,6 +1087,12 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected CompactionStatus internalCompactionStatus(boolean authoritative) 
{
+        validateAdminOperationOnTopic(authoritative);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        return topic.compactionStatus();
+    }
+
     public static CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(PulsarService pulsar,
             String clientAppId, AuthenticationDataSource authenticationData, 
TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f28d229..b03b533 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -40,6 +40,7 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -435,4 +436,17 @@ public class PersistentTopics extends PersistentTopicsBase 
{
         internalTriggerCompaction(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/compaction")
+    @ApiOperation(value = "Get the status of a compaction operation for a 
topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist, or compaction hasn't run") })
+    public CompactionStatus compactionStatus(
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        return internalCompactionStatus(authoritative);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3ffdde8..4b88743 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -40,6 +40,7 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -407,4 +408,18 @@ public class PersistentTopics extends PersistentTopicsBase 
{
         validateTopicName(property, namespace, encodedTopic);
         internalTriggerCompaction(authoritative);
     }
+
+    @GET
+    @Path("/{property}/{namespace}/{topic}/compaction")
+    @ApiOperation(value = "Get the status of a compaction operation for a 
topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist, or compaction hasn't run") })
+    public CompactionStatus compactionStatus(
+            @PathParam("property") String property,
+            @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(property, namespace, encodedTopic);
+        return internalCompactionStatus(authoritative);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6f7e4fb..2d63fb6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -84,6 +86,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -1622,6 +1625,27 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         }
     }
 
+
+    public synchronized CompactionStatus compactionStatus() {
+        final CompletableFuture<Long> current;
+        synchronized (this) {
+            current = currentCompaction;
+        }
+        if (!current.isDone()) {
+            return CompactionStatus.forStatus(CompactionStatus.Status.RUNNING);
+        } else {
+            try {
+                if (current.join() == COMPACTION_NEVER_RUN) {
+                    return 
CompactionStatus.forStatus(CompactionStatus.Status.NOT_RUN);
+                } else {
+                    return 
CompactionStatus.forStatus(CompactionStatus.Status.SUCCESS);
+                }
+            } catch (CancellationException | CompletionException e) {
+                return CompactionStatus.forError(e.getMessage());
+            }
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopic.class);
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 80fbf18..c53db5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1927,4 +1928,39 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         verify(compactor, times(2)).compact(topicName);
     }
 
+    @Test
+    public void testCompactionStatus() throws Exception {
+        String topicName = "persistent://prop-xyz/use/ns1/topic1";
+
+        // create a topic by creating a producer
+        pulsarClient.newProducer().topic(topicName).create().close();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        
assertEquals(admin.persistentTopics().compactionStatus(topicName).status,
+                     CompactionStatus.Status.NOT_RUN);
+
+        // mock actual compaction, we don't need to really run it
+        CompletableFuture<Long> promise = new CompletableFuture<Long>();
+        Compactor compactor = pulsar.getCompactor();
+        doReturn(promise).when(compactor).compact(topicName);
+        admin.persistentTopics().triggerCompaction(topicName);
+
+        
assertEquals(admin.persistentTopics().compactionStatus(topicName).status,
+                     CompactionStatus.Status.RUNNING);
+
+        promise.complete(1L);
+
+        
assertEquals(admin.persistentTopics().compactionStatus(topicName).status,
+                     CompactionStatus.Status.SUCCESS);
+
+        CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
+        doReturn(errorPromise).when(compactor).compact(topicName);
+        admin.persistentTopics().triggerCompaction(topicName);
+        errorPromise.completeExceptionally(new Exception("Failed at 
something"));
+
+        
assertEquals(admin.persistentTopics().compactionStatus(topicName).status,
+                     CompactionStatus.Status.ERROR);
+        assertTrue(admin.persistentTopics().compactionStatus(topicName)
+                   .lastError.contains("Failed at something"));
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
index 93df290..364fec4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
@@ -30,6 +30,7 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -909,4 +910,11 @@ public interface PersistentTopics {
      *            The topic on which to trigger compaction
      */
     void triggerCompaction(String topic) throws PulsarAdminException;
+
+    /**
+     * Check the status of an ongoing compaction for a topic.
+     *
+     * @param topic The topic whose compaction status we wish to check
+     */
+    CompactionStatus compactionStatus(String topic) throws 
PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index 6193d08..3121739 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
+import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -723,6 +724,18 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
         }
     }
 
+    @Override
+    public CompactionStatus compactionStatus(String topic)
+            throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            return request(topicPath(tn, "compaction"))
+                .get(CompactionStatus.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2PersistentTopics : 
adminPersistentTopics;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
new file mode 100644
index 0000000..9020c21
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compaction;
+
+/**
+ * Status of compaction for a topic.
+ */
+public class CompactionStatus {
+    public enum Status {
+        NOT_RUN,
+        RUNNING,
+        SUCCESS,
+        ERROR
+    };
+
+    public Status status;
+    public String lastError;
+
+    public CompactionStatus() {
+        this.status = Status.NOT_RUN;
+        this.lastError = "";
+    }
+
+    private CompactionStatus(Status status, String lastError) {
+        this.status = status;
+        this.lastError = lastError;
+    }
+
+    public static CompactionStatus forStatus(Status status) {
+        return new CompactionStatus(status, "");
+    }
+
+    public static CompactionStatus forError(String lastError) {
+        return new CompactionStatus(Status.ERROR, lastError);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to