This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 65be5c2  Make Source/Sink status Source/Sink specific (#3137)
65be5c2 is described below

commit 65be5c2501b9e1f824d1cd0f337aab7df82e4155
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Dec 7 10:39:41 2018 -0800

    Make Source/Sink status Source/Sink specific (#3137)
    
    * Make Source/Sink status Source/Sink specific
    
    * Fix unittest
---
 .../pulsar/common/policies/data/SinkStatus.java      | 17 ++++++++++++++---
 .../pulsar/common/policies/data/SourceStatus.java    | 18 +++++++++++++++---
 .../pulsar/functions/worker/rest/api/SinkImpl.java   |  5 +++--
 .../pulsar/functions/worker/rest/api/SourceImpl.java |  5 +++--
 .../integration/functions/PulsarFunctionsTest.java   | 20 +++++++++++---------
 5 files changed, 46 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
index 72cf736..609f107 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
@@ -27,7 +27,9 @@ import java.util.List;
 
 @Data
 public class SinkStatus {
+    // The total number of sink instances that ought to be running
     public int numInstances;
+    // The number of source instances that are actually running
     public int numRunning;
     public List<SinkInstanceStatus> instances = new LinkedList<>();
 
@@ -38,20 +40,29 @@ public class SinkStatus {
 
         @Data
         public static class SinkInstanceStatusData {
-
+            // Is this instance running?
             public boolean running;
 
+            // Do we have any error while running this instance
             public String error;
 
+            // Number of times this instance has restarted
             public long numRestarts;
 
-            public long numReceived;
+            // Number of messages read from Pulsar
+            public long numReadFromPulsar;
 
+            // Number of times there was a system exception handling messages
             public long numSystemExceptions;
 
+            // A list of the most recent system exceptions
             public List<ExceptionInformation> latestSystemExceptions;
 
-            public long lastInvocationTime;
+            // Number of messages written to sink
+            public long numWrittenToSink;
+
+            // When was the last time we received a message from Pulsar
+            public long lastReceivedTime;
 
             public String workerId;
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
index 1ea8a80..4043900 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
@@ -27,7 +27,9 @@ import java.util.List;
 
 @Data
 public class SourceStatus {
+    // The total number of source instances that ought to be running
     public int numInstances;
+    // The number of source instances that are actually running
     public int numRunning;
     public List<SourceInstanceStatus> instances = new LinkedList<>();
 
@@ -38,21 +40,31 @@ public class SourceStatus {
 
         @Data
         public static class SourceInstanceStatusData {
-
+            // Is this instance running?
             public boolean running;
 
+            // Do we have any error while running this instance
             public String error;
 
+            // Number of times this instance has restarted
             public long numRestarts;
 
-            public long numReceived;
+            // Number of messages received from source
+            public long numReceivedFromSource;
 
+            // Number of times there was a system exception handling messages
             public long numSystemExceptions;
 
+            // A list of the most recent system exceptions
             public List<ExceptionInformation> latestSystemExceptions;
 
-            public long lastInvocationTime;
+            // Number of messages written into pulsar
+            public long numWritten;
+
+            // When was the last time we received a message from the source
+            public long lastReceivedTime;
 
+            // The worker id on which the source is running
             public String workerId;
         }
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 4ecba12..45bbba4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -60,7 +60,7 @@ public class SinkImpl extends ComponentImpl {
             sinkInstanceStatusData.setRunning(status.getRunning());
             sinkInstanceStatusData.setError(status.getFailureException());
             sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
-            sinkInstanceStatusData.setNumReceived(status.getNumReceived());
+            
sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived());
 
             List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
@@ -82,7 +82,8 @@ public class SinkImpl extends ComponentImpl {
             }
             
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
-            
sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            
sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed());
+            
sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
             sinkInstanceStatusData.setWorkerId(assignedWorkerId);
 
             return sinkInstanceStatusData;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 412019a..f05eea6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -59,7 +59,7 @@ public class SourceImpl extends ComponentImpl {
             sourceInstanceStatusData.setRunning(status.getRunning());
             sourceInstanceStatusData.setError(status.getFailureException());
             sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
-            sourceInstanceStatusData.setNumReceived(status.getNumReceived());
+            
sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived());
 
             List<ExceptionInformation> userExceptionInformationList = new 
LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation 
exceptionEntry : status.getLatestUserExceptionsList()) {
@@ -81,7 +81,8 @@ public class SourceImpl extends ComponentImpl {
             }
             
sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
-            
sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
+            
sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed());
+            
sourceInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
             sourceInstanceStatusData.setWorkerId(assignedWorkerId);
 
             return sourceInstanceStatusData;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5add9a1..f100c21 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -245,7 +245,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "sink",
-            "getstatus",
+            "status",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", sinkName
@@ -472,7 +472,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
-            "getstatus",
+            "status",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", sourceName
@@ -524,7 +524,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
-            "getstatus",
+            "status",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", sourceName
@@ -544,8 +544,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                     assertEquals(sourceStatus.getInstances().size(), 1);
                     
assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
                     
assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    
assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastInvocationTime()
 > 0);
-                    
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceived(), 
numMessages);
+                    
assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() 
> 0);
+                    
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(),
 numMessages);
+                    
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), 
numMessages);
                     
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 
0);
                     
assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
 0);
                     return;
@@ -568,7 +569,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         String[] commands = {
                 PulsarCluster.ADMIN_SCRIPT,
                 "sink",
-                "getstatus",
+                "status",
                 "--tenant", tenant,
                 "--namespace", namespace,
                 "--name", sinkName
@@ -588,8 +589,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                     assertEquals(sinkStatus.getInstances().size(), 1);
                     
assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
                     
assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    
assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastInvocationTime() 
> 0);
-                    
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReceived(), 
numMessages);
+                    
assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 
0);
+                    
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(),
 numMessages);
+                    
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(),
 numMessages);
                     
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
                     
assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
 0);
                     return;
@@ -940,7 +942,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
-            "getstatus",
+            "status",
             "--tenant", "public",
             "--namespace", "default",
             "--name", functionName

Reply via email to