This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e2b95f2 Allow ability to specify resources to functions (#1751) e2b95f2 is described below commit e2b95f2560f3d524d39a459a59682717d51d65f0 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu May 10 22:46:31 2018 -0700 Allow ability to specify resources to functions (#1751) * Added ability to specify resources to functions * Corrected arguments * Address comments * Take feedback into account --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 25 +++++++++++++++++++ .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 28 ++++++++++++++++++++- .../org/apache/pulsar/admin/cli/CmdSources.java | 29 +++++++++++++++++++++- .../proto/src/main/proto/Function.proto | 7 ++++++ .../pulsar/functions/runtime/ProcessRuntime.java | 7 ++++++ .../pulsar/functions/utils/FunctionConfig.java | 1 + .../utils/{SinkConfig.java => Resources.java} | 20 +++++---------- .../apache/pulsar/functions/utils/SinkConfig.java | 1 + .../pulsar/functions/utils/SourceConfig.java | 1 + 9 files changed, 103 insertions(+), 16 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 3e6201e..e535506 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -63,6 +63,7 @@ import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled; import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec; import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.Resources; import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType; import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; @@ -219,6 +220,12 @@ public class CmdFunctions extends CmdBase { protected String userConfigString; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected String parallelism; + @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") + protected Double cpu; + @Parameter(names = "--ram", description = "The ram in bytes that need to be allocated per function instance(applicable only to process/docker runtime)") + protected Long ram; + @Parameter(names = "--disk", description = "The disk in bytes that need to be allocated per function instance(applicable only to docker runtime)") + protected Long disk; protected FunctionConfig functionConfig; protected String userCodeFile; @@ -316,6 +323,11 @@ public class CmdFunctions extends CmdBase { functionConfig.setParallelism(num); } + com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0, "The cpu allocation for the function must be positive"); + com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0, "The ram allocation for the function must be positive"); + com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0, "The disk allocation for the function must be positive"); + functionConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); + if (functionConfig.getSubscriptionType() != null && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER && functionConfig.getProcessingGuarantees() != null @@ -616,6 +628,19 @@ public class CmdFunctions extends CmdBase { } functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); + if (functionConfig.getResources() != null) { + Resources.Builder bldr = Resources.newBuilder(); + if (functionConfig.getResources().getCpu() != null) { + bldr.setCpu(functionConfig.getResources().getCpu()); + } + if (functionConfig.getResources().getRam() != null) { + bldr.setRam(functionConfig.getResources().getRam()); + } + if (functionConfig.getResources().getDisk() != null) { + bldr.setDisk(functionConfig.getResources().getDisk()); + } + functionDetailsBuilder.setResources(bldr.build()); + } return functionDetailsBuilder.build(); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 36fdf46..bac98d9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.shaded.proto.Function; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.Resources; import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec; import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; @@ -133,6 +134,12 @@ public class CmdSinks extends CmdBase { @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the " + "sink's configuration") protected String sinkConfigFile; + @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") + protected Double cpu; + @Parameter(names = "--ram", description = "The ram in bytes that need to be allocated per function instance(applicable only to process/docker runtime)") + protected Long ram; + @Parameter(names = "--disk", description = "The disk in bytes that need to be allocated per function instance(applicable only to docker runtime)") + protected Long disk; @Parameter(names = "--sinkConfig", description = "Sink config key/values") protected String sinkConfigString; @@ -202,6 +209,11 @@ public class CmdSinks extends CmdBase { throw new IllegalArgumentException("Connector JAR not specfied"); } + com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0, "The cpu allocation for the sink must be positive"); + com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0, "The ram allocation for the sink must be positive"); + com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0, "The disk allocation for the sink must be positive"); + sinkConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); + if (null != sinkConfigString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, Object> sinkConfigMap = new Gson().fromJson(sinkConfigString, type); @@ -290,6 +302,20 @@ public class CmdSinks extends CmdBase { sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); sinkSpecBuilder.setTypeClassName(typeArg.getName()); functionDetailsBuilder.setSink(sinkSpecBuilder); + + if (sinkConfig.getResources() != null) { + Resources.Builder bldr = Resources.newBuilder(); + if (sinkConfig.getResources().getCpu() != null) { + bldr.setCpu(sinkConfig.getResources().getCpu()); + } + if (sinkConfig.getResources().getRam() != null) { + bldr.setRam(sinkConfig.getResources().getRam()); + } + if (sinkConfig.getResources().getDisk() != null) { + bldr.setDisk(sinkConfig.getResources().getDisk()); + } + functionDetailsBuilder.setResources(bldr.build()); + } return functionDetailsBuilder.build(); } } @@ -355,4 +381,4 @@ public class CmdSinks extends CmdBase { } throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name()); } -} \ No newline at end of file +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 79dbb50..267673c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.shaded.proto.Function; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.Resources; import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec; import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; @@ -133,6 +134,12 @@ public class CmdSources extends CmdBase { @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the " + "source's configuration") protected String sourceConfigFile; + @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") + protected Double cpu; + @Parameter(names = "--ram", description = "The ram in bytes that need to be allocated per function instance(applicable only to process/docker runtime)") + protected Long ram; + @Parameter(names = "--disk", description = "The disk in bytes that need to be allocated per function instance(applicable only to docker runtime)") + protected Long disk; @Parameter(names = "--sourceConfig", description = "Source config key/values") protected String sourceConfigString; @@ -187,6 +194,11 @@ public class CmdSources extends CmdBase { throw new IllegalArgumentException("Connector JAR not specfied"); } + com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0, "The cpu allocation for the source must be positive"); + com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0, "The ram allocation for the source must be positive"); + com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0, "The disk allocation for the source must be positive"); + sourceConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); + if (null != sourceConfigString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, Object> sourceConfigMap = new Gson().fromJson(sourceConfigString, type); @@ -277,6 +289,21 @@ public class CmdSources extends CmdBase { sinkSpecBuilder.setTypeClassName(typeArg.getName()); functionDetailsBuilder.setSink(sinkSpecBuilder); + + if (sourceConfig.getResources() != null) { + Resources.Builder bldr = Resources.newBuilder(); + if (sourceConfig.getResources().getCpu() != null) { + bldr.setCpu(sourceConfig.getResources().getCpu()); + } + if (sourceConfig.getResources().getRam() != null) { + bldr.setRam(sourceConfig.getResources().getRam()); + } + if (sourceConfig.getResources().getDisk() != null) { + bldr.setDisk(sourceConfig.getResources().getDisk()); + } + functionDetailsBuilder.setResources(bldr.build()); + } + return functionDetailsBuilder.build(); } } @@ -337,4 +364,4 @@ public class CmdSources extends CmdBase { } throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name()); } -} \ No newline at end of file +} diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index e73a5bb..4f44f9b 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -34,6 +34,12 @@ enum SubscriptionType { FAILOVER = 1; } +message Resources { + double cpu = 1; + int64 ram = 2; + int64 disk = 3; +} + message FunctionDetails { enum Runtime { JAVA = 0; @@ -51,6 +57,7 @@ message FunctionDetails { int32 parallelism = 10; SourceSpec source = 11; SinkSpec sink = 12; + Resources resources = 13; } message SourceSpec { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 5fba569..568c51f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -81,6 +81,12 @@ class ProcessRuntime implements Runtime { args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml"); args.add("-Dpulsar.log.dir=" + logDirectory); args.add("-Dpulsar.log.file=" + instanceConfig.getFunctionDetails().getName()); + if (instanceConfig.getFunctionDetails().getResources() != null) { + Function.Resources resources = instanceConfig.getFunctionDetails().getResources(); + if (resources.getRam() != 0) { + args.add("-Xmx" + String.valueOf(resources.getRam())); + } + } args.add(JavaInstanceMain.class.getName()); args.add("--jar"); args.add(codeFile); @@ -93,6 +99,7 @@ class ProcessRuntime implements Runtime { args.add(logDirectory); args.add("--logging_file"); args.add(instanceConfig.getFunctionDetails().getName()); + // TODO:- Find a platform independent way of controlling memory for a python application } args.add("--instance_id"); args.add(instanceConfig.getInstanceId()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index cb7d0a2..5baecfb 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -79,5 +79,6 @@ public class FunctionConfig { private Runtime runtime; private boolean autoAck; private int parallelism; + private Resources resources; private String fqfn; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java similarity index 66% copy from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java copy to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java index bfc9a43..2a1ac8e 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java @@ -18,11 +18,7 @@ */ package org.apache.pulsar.functions.utils; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import java.util.HashMap; import java.util.Map; @@ -32,13 +28,9 @@ import java.util.Map; @Data @EqualsAndHashCode @ToString -public class SinkConfig { - private String tenant; - private String namespace; - private String name; - private String className; - private Map<String, String> topicToSerdeClassName; - private Map<String, Object> configs = new HashMap<>(); - private int parallelism = 1; - private FunctionConfig.ProcessingGuarantees processingGuarantees; +@AllArgsConstructor +public class Resources { + private Double cpu; + private Long ram; + private Long disk; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index bfc9a43..3f838cf 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -41,4 +41,5 @@ public class SinkConfig { private Map<String, Object> configs = new HashMap<>(); private int parallelism = 1; private FunctionConfig.ProcessingGuarantees processingGuarantees; + private Resources resources; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java index faf1f95..89e3c80 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java @@ -42,4 +42,5 @@ public class SourceConfig { private Map<String, Object> configs = new HashMap<>(); private int parallelism = 1; private FunctionConfig.ProcessingGuarantees processingGuarantees; + private Resources resources; } -- To stop receiving notification emails like this one, please contact mme...@apache.org.