This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 65be5c2 Make Source/Sink status Source/Sink specific (#3137) 65be5c2 is described below commit 65be5c2501b9e1f824d1cd0f337aab7df82e4155 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Dec 7 10:39:41 2018 -0800 Make Source/Sink status Source/Sink specific (#3137) * Make Source/Sink status Source/Sink specific * Fix unittest --- .../pulsar/common/policies/data/SinkStatus.java | 17 ++++++++++++++--- .../pulsar/common/policies/data/SourceStatus.java | 18 +++++++++++++++--- .../pulsar/functions/worker/rest/api/SinkImpl.java | 5 +++-- .../pulsar/functions/worker/rest/api/SourceImpl.java | 5 +++-- .../integration/functions/PulsarFunctionsTest.java | 20 +++++++++++--------- 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java index 72cf736..609f107 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java @@ -27,7 +27,9 @@ import java.util.List; @Data public class SinkStatus { + // The total number of sink instances that ought to be running public int numInstances; + // The number of source instances that are actually running public int numRunning; public List<SinkInstanceStatus> instances = new LinkedList<>(); @@ -38,20 +40,29 @@ public class SinkStatus { @Data public static class SinkInstanceStatusData { - + // Is this instance running? public boolean running; + // Do we have any error while running this instance public String error; + // Number of times this instance has restarted public long numRestarts; - public long numReceived; + // Number of messages read from Pulsar + public long numReadFromPulsar; + // Number of times there was a system exception handling messages public long numSystemExceptions; + // A list of the most recent system exceptions public List<ExceptionInformation> latestSystemExceptions; - public long lastInvocationTime; + // Number of messages written to sink + public long numWrittenToSink; + + // When was the last time we received a message from Pulsar + public long lastReceivedTime; public String workerId; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java index 1ea8a80..4043900 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java @@ -27,7 +27,9 @@ import java.util.List; @Data public class SourceStatus { + // The total number of source instances that ought to be running public int numInstances; + // The number of source instances that are actually running public int numRunning; public List<SourceInstanceStatus> instances = new LinkedList<>(); @@ -38,21 +40,31 @@ public class SourceStatus { @Data public static class SourceInstanceStatusData { - + // Is this instance running? public boolean running; + // Do we have any error while running this instance public String error; + // Number of times this instance has restarted public long numRestarts; - public long numReceived; + // Number of messages received from source + public long numReceivedFromSource; + // Number of times there was a system exception handling messages public long numSystemExceptions; + // A list of the most recent system exceptions public List<ExceptionInformation> latestSystemExceptions; - public long lastInvocationTime; + // Number of messages written into pulsar + public long numWritten; + + // When was the last time we received a message from the source + public long lastReceivedTime; + // The worker id on which the source is running public String workerId; } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java index 4ecba12..45bbba4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java @@ -60,7 +60,7 @@ public class SinkImpl extends ComponentImpl { sinkInstanceStatusData.setRunning(status.getRunning()); sinkInstanceStatusData.setError(status.getFailureException()); sinkInstanceStatusData.setNumRestarts(status.getNumRestarts()); - sinkInstanceStatusData.setNumReceived(status.getNumReceived()); + sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived()); List<ExceptionInformation> userExceptionInformationList = new LinkedList<>(); for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) { @@ -82,7 +82,8 @@ public class SinkImpl extends ComponentImpl { } sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList); - sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime()); + sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed()); + sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime()); sinkInstanceStatusData.setWorkerId(assignedWorkerId); return sinkInstanceStatusData; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java index 412019a..f05eea6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java @@ -59,7 +59,7 @@ public class SourceImpl extends ComponentImpl { sourceInstanceStatusData.setRunning(status.getRunning()); sourceInstanceStatusData.setError(status.getFailureException()); sourceInstanceStatusData.setNumRestarts(status.getNumRestarts()); - sourceInstanceStatusData.setNumReceived(status.getNumReceived()); + sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived()); List<ExceptionInformation> userExceptionInformationList = new LinkedList<>(); for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) { @@ -81,7 +81,8 @@ public class SourceImpl extends ComponentImpl { } sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList); - sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime()); + sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed()); + sourceInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime()); sourceInstanceStatusData.setWorkerId(assignedWorkerId); return sourceInstanceStatusData; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 5add9a1..f100c21 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -245,7 +245,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sink", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sinkName @@ -472,7 +472,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "source", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName @@ -524,7 +524,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "source", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName @@ -544,8 +544,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { assertEquals(sourceStatus.getInstances().size(), 1); assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0); assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true); - assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0); - assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages); + assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0); + assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages); + assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages); assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0); return; @@ -568,7 +569,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sink", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sinkName @@ -588,8 +589,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { assertEquals(sinkStatus.getInstances().size(), 1); assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0); assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true); - assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0); - assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages); + assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0); + assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages); + assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages); assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0); return; @@ -940,7 +942,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", - "getstatus", + "status", "--tenant", "public", "--namespace", "default", "--name", functionName