This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 866bbec adding CmdFunctions unit tests and clean up (#1869) 866bbec is described below commit 866bbece0cc57894bdc70dd2fb99c23abf0f0bef Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu May 31 14:11:07 2018 -0700 adding CmdFunctions unit tests and clean up (#1869) * adding CmdFunctions unit tests and clean up * remove unnecessary import --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 309 ++++++++++++++++++--- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 68 +---- 2 files changed, 290 insertions(+), 87 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 1cc239d..0826956 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -18,29 +18,10 @@ */ package org.apache.pulsar.admin.cli; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mockStatic; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; - import com.google.gson.Gson; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; import org.apache.bookkeeper.clients.StorageClientBuilder; @@ -50,7 +31,6 @@ import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction; import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction; import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction; import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; -import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -68,9 +48,34 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.testng.Assert.assertEquals; + /** * Unit test of {@link CmdFunctions}. */ +@Slf4j @PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class }) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" }) public class CmdFunctionsTest { @@ -124,16 +129,16 @@ public class CmdFunctionsTest { when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class)); } - @Test - public void testLocalRunnerCmdNoArguments() throws Exception { - cmd.run(new String[] { "run" }); - - LocalRunner runner = cmd.getLocalRunner(); - assertNull(runner.getFunctionName()); - assertNull(runner.getInputs()); - assertNull(runner.getOutput()); - assertNull(runner.getFnConfigFile()); - } +// @Test +// public void testLocalRunnerCmdNoArguments() throws Exception { +// cmd.run(new String[] { "run" }); +// +// LocalRunner runner = cmd.getLocalRunner(); +// assertNull(runner.getFunctionName()); +// assertNull(runner.getInputs()); +// assertNull(runner.getOutput()); +// assertNull(runner.getFnConfigFile()); +// } /* TODO(sijie):- Can we fix this? @@ -347,8 +352,6 @@ public class CmdFunctionsTest { String inputTopicName = TEST_NAME + "-input-topic"; String outputTopicName = TEST_NAME + "-output-topic"; - - cmd.run(new String[] { "update", "--name", fnName, @@ -425,4 +428,244 @@ public class CmdFunctionsTest { "test-key", new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8)); } + + private static final String fnName = TEST_NAME + "-function"; + private static final String inputTopicName = TEST_NAME + "-input-topic"; + private static final String outputTopicName = TEST_NAME + "-output-topic"; + + private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorrectArgs, + String errMessageCheck) throws Exception { + + String[] cmds = {"create", "update", "localrun"}; + + for (String type : cmds) { + List<String> correctArgList = new LinkedList<>(); + List<String> incorrectArgList = new LinkedList<>(); + correctArgList.add(type); + incorrectArgList.add(type); + + correctArgList.addAll(Arrays.asList(correctArgs)); + incorrectArgList.addAll(Arrays.asList(incorrectArgs)); + cmd.run(correctArgList.toArray(new String[correctArgList.size()])); + + if (type.equals("create")) { + CreateFunction creater = cmd.getCreater(); + assertEquals(fnName, creater.getFunctionName()); + assertEquals(inputTopicName, creater.getInputs()); + assertEquals(outputTopicName, creater.getOutput()); + } else if (type.equals("update")){ + UpdateFunction updater = cmd.getUpdater(); + assertEquals(fnName, updater.getFunctionName()); + assertEquals(inputTopicName, updater.getInputs()); + assertEquals(outputTopicName, updater.getOutput()); + } else { + CmdFunctions.LocalRunner localRunner = cmd.getLocalRunner(); + assertEquals(fnName, localRunner.getFunctionName()); + assertEquals(inputTopicName, localRunner.getInputs()); + assertEquals(outputTopicName, localRunner.getOutput()); + } + + if (type.equals("create")) { + verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString()); + } else if (type.equals("update")) { + verify(functions, times(1)).updateFunction(any(FunctionDetails.class), anyString()); + } + + setup(); + ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); + consoleOutputCapturer.start(); + cmd.run(incorrectArgList.toArray(new String[incorrectArgList.size()])); + + consoleOutputCapturer.stop(); + String output = consoleOutputCapturer.getStderr(); + assertEquals(output.replace("\n", ""), errMessageCheck); + } + } + + @Test + public void TestCreateFunctionParallelism() throws Exception{ + + String[] correctArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + "--parallelism", "1" + }; + + String[] incorrectArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + "--parallelism", "-1" + }; + + testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Field 'parallelism' must be a Positive Number"); + + } + + @Test + public void TestCreateTopicName() throws Exception { + + String[] correctArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }; + + String wrongOutputTopicName = TEST_NAME + "-output-topic/test:"; + String[] incorrectArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", wrongOutputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }; + + testValidateFunctionsConfigs(correctArgs, incorrectArgs, "The topic name " + wrongOutputTopicName + " is invalid for field 'output'"); + } + + @Test + public void TestCreateClassName() throws Exception { + + String[] correctArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }; + + String cannotLoadClass = "com.test.Function"; + String[] incorrectArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", cannotLoadClass, + }; + + testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Cannot find/load class " + cannotLoadClass); + } + + @Test + public void TestCreateSameInOutTopic() throws Exception { + + String[] correctArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", outputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }; + + String[] incorrectArgs = new String[]{ + "--name", fnName, + "--inputs", inputTopicName, + "--output", inputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }; + + testValidateFunctionsConfigs(correctArgs, incorrectArgs, + "Output topic " + inputTopicName + + " is also being used as an input topic (topics must be one or the other)"); + + } + + + public static class ConsoleOutputCapturer { + private ByteArrayOutputStream stdout; + private ByteArrayOutputStream stderr; + private PrintStream previous; + private boolean capturing; + + public void start() { + if (capturing) { + return; + } + + capturing = true; + previous = System.out; + stdout = new ByteArrayOutputStream(); + stderr = new ByteArrayOutputStream(); + + OutputStream outputStreamCombinerstdout = + new OutputStreamCombiner(Arrays.asList(previous, stdout)); + PrintStream stdoutStream = new PrintStream(outputStreamCombinerstdout); + + OutputStream outputStreamCombinerStderr = + new OutputStreamCombiner(Arrays.asList(previous, stderr)); + PrintStream stderrStream = new PrintStream(outputStreamCombinerStderr); + + System.setOut(stdoutStream); + System.setErr(stderrStream); + } + + public void stop() { + if (!capturing) { + return; + } + + System.setOut(previous); + + previous = null; + capturing = false; + } + + public String getStdout() { + return stdout.toString(); + } + + public String getStderr() { + return stderr.toString(); + } + + private static class OutputStreamCombiner extends OutputStream { + private List<OutputStream> outputStreams; + + public OutputStreamCombiner(List<OutputStream> outputStreams) { + this.outputStreams = outputStreams; + } + + public void write(int b) throws IOException { + for (OutputStream os : outputStreams) { + os.write(b); + } + } + + public void flush() throws IOException { + for (OutputStream os : outputStreams) { + os.flush(); + } + } + + public void close() throws IOException { + for (OutputStream os : outputStreams) { + os.close(); + } + } + } + } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 3181290..9b45f3c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -34,7 +34,6 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import net.jodah.typetools.TypeResolver; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; import org.apache.bookkeeper.api.kv.result.KeyValue; @@ -45,7 +44,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; @@ -65,11 +63,9 @@ import org.apache.pulsar.functions.windowing.WindowUtils; import java.io.File; import java.io.IOException; -import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -359,17 +355,14 @@ public class CmdFunctions extends CmdBase { functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON); userCodeFile = pyFile; } else { - throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function"); + throw new ParameterException("Either a Java jar or a Python file needs to be specified for the function"); } // infer default vaues inferMissingArguments(functionConfig); - - // check if function configs are valid - validateFunctionConfigs(functionConfig); } - private void validateFunctionConfigs(FunctionConfig functionConfig) { + protected void validateFunctionConfigs(FunctionConfig functionConfig) { if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { File file = new File(jarFile); @@ -377,7 +370,7 @@ public class CmdFunctions extends CmdBase { try { userJarLoader = Reflections.loadJar(file); } catch (MalformedURLException e) { - throw new RuntimeException("Failed to load user jar " + file, e); + throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage()); } // make sure the function class loader is accessible thread-locally Thread.currentThread().setContextClassLoader(userJarLoader); @@ -415,7 +408,7 @@ public class CmdFunctions extends CmdBase { // set auto ack to false since windowing framework is responsible // for acking and not the function framework if (autoAck != null && autoAck == true) { - throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality"); + throw new ParameterException("Cannot enable auto ack when using windowing functionality"); } functionConfig.setAutoAck(false); } @@ -423,7 +416,7 @@ public class CmdFunctions extends CmdBase { private void inferMissingFunctionName(FunctionConfig functionConfig) { if (isNull(functionConfig.getClassName())) { - throw new IllegalArgumentException("You must specify a class name for the function"); + throw new ParameterException("You must specify a class name for the function"); } String [] domains = functionConfig.getClassName().split("\\."); @@ -469,13 +462,7 @@ public class CmdFunctions extends CmdBase { Class<?>[] typeArgs = null; if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - - File file = new File(jarFile); - try { - Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to load user jar " + file, e); - } + // Assuming any external jars are already loaded typeArgs = Utils.getFunctionTypes(functionConfig); } @@ -588,7 +575,8 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { - checkRequiredFields(functionConfig); + // check if function configs are valid + validateFunctionConfigs(functionConfig); CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin); } @@ -598,7 +586,8 @@ public class CmdFunctions extends CmdBase { class CreateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { - checkRequiredFields(functionConfig); + // check if function configs are valid + validateFunctionConfigs(functionConfig); admin.functions().createFunction(convert(functionConfig), userCodeFile); print("Created successfully"); } @@ -637,7 +626,8 @@ public class CmdFunctions extends CmdBase { class UpdateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { - checkRequiredFields(functionConfig); + // check if function configs are valid + validateFunctionConfigs(functionConfig); admin.functions().updateFunction(convert(functionConfig), userCodeFile); print("Updated successfully"); } @@ -720,7 +710,7 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { if (triggerFile == null && triggerValue == null) { - throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified"); + throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified"); } String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile); System.out.println(retval); @@ -850,36 +840,6 @@ public class CmdFunctions extends CmdBase { return mapper.readValue(file, FunctionConfig.class); } - private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException { - if (inputTopics.contains(outputTopic)) { - throw new IllegalArgumentException( - String.format("Output topic %s is also being used as an input topic (topics must be one or the other)", - outputTopic)); - } - } - - private static void checkRequiredFields(FunctionConfig config) throws IllegalArgumentException { - if (isNull(config.getTenant())) { - throw new IllegalArgumentException("You must specify a tenant for the function"); - } - - if (isNull(config.getNamespace())) { - throw new IllegalArgumentException("You must specify a namespace for the function"); - } - - if (isNull(config.getName())) { - throw new IllegalArgumentException("You must specify a name for the function"); - } - - if (isNull(config.getClassName())) { - throw new IllegalArgumentException("You must specify a class name for the function"); - } - - if (config.getInputs().isEmpty() && config.getCustomSerdeInputs().isEmpty()) { - throw new IllegalArgumentException("You must specify one or more input topics for the function"); - } - } - private static FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) { for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) { if (type.name().equals(runtime.name())) { @@ -912,7 +872,7 @@ public class CmdFunctions extends CmdBase { private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { String[] args = fqfn.split("/"); if (args.length != 3) { - throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name"); + throw new ParameterException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name"); } else { functionConfig.setTenant(args[0]); functionConfig.setNamespace(args[1]); -- To stop receiving notification emails like this one, please contact jerryp...@apache.org.