This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a2ca79d fixing problem of functions that have an array as input or output (#1837) a2ca79d is described below commit a2ca79dad28ecee4e9123678e5d9ed4652e9e13c Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed May 23 12:57:38 2018 -0700 fixing problem of functions that have an array as input or output (#1837) * fixing problem of functions that have an array as input or output * add additional null checks --- pom.xml | 7 +++++++ pulsar-functions/instance/pom.xml | 5 +++++ .../functions/instance/JavaInstanceRunnable.java | 21 +++++++++++++-------- .../apache/pulsar/functions/sink/PulsarSink.java | 11 +++++++---- .../pulsar/functions/source/PulsarSource.java | 9 +++++++-- pulsar-functions/runtime-shaded/pom.xml | 3 +++ 6 files changed, 42 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 22506ca..28e769b 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,7 @@ flexible messaging model and an intuitive client API.</description> <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version> <dockerfile-maven.version>1.3.7</dockerfile-maven.version> <typetools.version>0.5.0</typetools.version> + <jboss-reflect.version>2.2.1.SP1</jboss-reflect.version> <protobuf2.version>2.4.1</protobuf2.version> <protobuf3.version>3.5.1</protobuf3.version> <grpc.version>1.5.0</grpc.version> @@ -690,6 +691,12 @@ flexible messaging model and an intuitive client API.</description> </dependency> <dependency> + <groupId>org.jboss</groupId> + <artifactId>jboss-reflect</artifactId> + <version>${jboss-reflect.version}</version> + </dependency> + + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-all</artifactId> <version>${grpc.version}</version> diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index bcdf780..b6dfb74 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -84,6 +84,11 @@ <artifactId>typetools</artifactId> </dependency> + <dependency> + <groupId>org.jboss</groupId> + <artifactId>jboss-reflect</artifactId> + </dependency> + </dependencies> </project> diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 5b5e943..a96c703 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -341,16 +341,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Override public void close() { - try { - source.close(); - } catch (Exception e) { - log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); + if (source != null) { + try { + source.close(); + } catch (Exception e) { + log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); + + } } - try { - sink.close(); - } catch (Exception e) { - log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); + if (sink != null) { + try { + sink.close(); + } catch (Exception e) { + log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); + } } if (null != javaInstance) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 4fccb54..8b246cf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -40,6 +40,7 @@ import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; +import org.jboss.util.Classes; import java.util.Base64; import java.util.Map; @@ -232,14 +233,16 @@ public class PulsarSink<T> implements Sink<T> { @Override public void close() throws Exception { - this.pulsarSinkProcessor.close(); - + if (this.pulsarSinkProcessor != null) { + this.pulsarSinkProcessor.close(); + } } @VisibleForTesting void setupSerDe() throws ClassNotFoundException { - Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass( - this.pulsarSinkConfig.getTypeClassName()); + + Class<?> typeArg = Classes.loadClass(this.pulsarSinkConfig.getTypeClassName(), + Thread.currentThread().getContextClassLoader()); if (!Void.class.equals(typeArg)) { // return type is not `Void.class` if (this.pulsarSinkConfig.getSerDeClassName() == null diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 5cae902..25b0874 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -32,6 +32,7 @@ import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.io.core.Record; import org.apache.pulsar.io.core.Source; +import org.jboss.util.Classes; import java.util.ArrayList; import java.util.HashMap; @@ -124,13 +125,17 @@ public class PulsarSource<T> implements Source<T> { @Override public void close() throws Exception { - this.inputConsumer.close(); + if (this.inputConsumer != null) { + this.inputConsumer.close(); + } } @VisibleForTesting void setupSerDe() throws ClassNotFoundException { - Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarSourceConfig.getTypeClassName()); + Class<?> typeArg = Classes.loadClass(this.pulsarSourceConfig.getTypeClassName(), + Thread.currentThread().getContextClassLoader()); + if (Void.class.equals(typeArg)) { throw new RuntimeException("Input type of Pulsar Function cannot be Void"); } diff --git a/pulsar-functions/runtime-shaded/pom.xml b/pulsar-functions/runtime-shaded/pom.xml index 1536e21..f50e3d9 100644 --- a/pulsar-functions/runtime-shaded/pom.xml +++ b/pulsar-functions/runtime-shaded/pom.xml @@ -132,6 +132,9 @@ <include>com.google.googlejavaformat:google-java-format</include> <include>com.google.errorprone:javac</include> <include>net.jodah:typetools</include> + <include>org.jboss:jboss-reflect</include> + <include>org.jboss.logging:jboss-logging-spi</include> + <include>org.jboss:jboss-common-core</include> <include>com.beust:jcommander</include> <include>com.fasterxml.jackson.dataformat:jackson-dataformat-yaml</include> <include>org.yaml:snakeyaml</include> -- To stop receiving notification emails like this one, please contact si...@apache.org.