This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b95f2  Allow ability to specify resources to functions (#1751)
e2b95f2 is described below

commit e2b95f2560f3d524d39a459a59682717d51d65f0
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu May 10 22:46:31 2018 -0700

    Allow ability to specify resources to functions (#1751)
    
    * Added ability to specify resources to functions
    
    * Corrected arguments
    
    * Address comments
    
    * Take feedback into account
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 25 +++++++++++++++++++
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 28 ++++++++++++++++++++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 29 +++++++++++++++++++++-
 .../proto/src/main/proto/Function.proto            |  7 ++++++
 .../pulsar/functions/runtime/ProcessRuntime.java   |  7 ++++++
 .../pulsar/functions/utils/FunctionConfig.java     |  1 +
 .../utils/{SinkConfig.java => Resources.java}      | 20 +++++----------
 .../apache/pulsar/functions/utils/SinkConfig.java  |  1 +
 .../pulsar/functions/utils/SourceConfig.java       |  1 +
 9 files changed, 103 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3e6201e..e535506 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -63,6 +63,7 @@ import 
org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.Resources;
 import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
@@ -219,6 +220,12 @@ public class CmdFunctions extends CmdBase {
         protected String userConfigString;
         @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
         protected String parallelism;
+        @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
+        protected Double cpu;
+        @Parameter(names = "--ram", description = "The ram in bytes that need 
to be allocated per function instance(applicable only to process/docker 
runtime)")
+        protected Long ram;
+        @Parameter(names = "--disk", description = "The disk in bytes that 
need to be allocated per function instance(applicable only to docker runtime)")
+        protected Long disk;
 
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
@@ -316,6 +323,11 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setParallelism(num);
             }
 
+            com.google.common.base.Preconditions.checkArgument(cpu == null || 
cpu > 0, "The cpu allocation for the function must be positive");
+            com.google.common.base.Preconditions.checkArgument(ram == null || 
ram > 0, "The ram allocation for the function must be positive");
+            com.google.common.base.Preconditions.checkArgument(disk == null || 
disk > 0, "The disk allocation for the function must be positive");
+            functionConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
+
             if (functionConfig.getSubscriptionType() != null
                     && functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
                     && functionConfig.getProcessingGuarantees() != null
@@ -616,6 +628,19 @@ public class CmdFunctions extends CmdBase {
             }
             functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
             
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
+            if (functionConfig.getResources() != null) {
+                Resources.Builder bldr = Resources.newBuilder();
+                if (functionConfig.getResources().getCpu() != null) {
+                    bldr.setCpu(functionConfig.getResources().getCpu());
+                }
+                if (functionConfig.getResources().getRam() != null) {
+                    bldr.setRam(functionConfig.getResources().getRam());
+                }
+                if (functionConfig.getResources().getDisk() != null) {
+                    bldr.setDisk(functionConfig.getResources().getDisk());
+                }
+                functionDetailsBuilder.setResources(bldr.build());
+            }
             return functionDetailsBuilder.build();
         }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 36fdf46..bac98d9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.shaded.proto.Function;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.Resources;
 import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
@@ -133,6 +134,12 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = "--sinkConfigFile", description = "The path to a 
YAML config file specifying the "
                 + "sink's configuration")
         protected String sinkConfigFile;
+        @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
+        protected Double cpu;
+        @Parameter(names = "--ram", description = "The ram in bytes that need 
to be allocated per function instance(applicable only to process/docker 
runtime)")
+        protected Long ram;
+        @Parameter(names = "--disk", description = "The disk in bytes that 
need to be allocated per function instance(applicable only to docker runtime)")
+        protected Long disk;
         @Parameter(names = "--sinkConfig", description = "Sink config 
key/values")
         protected String sinkConfigString;
 
@@ -202,6 +209,11 @@ public class CmdSinks extends CmdBase {
                 throw new IllegalArgumentException("Connector JAR not 
specfied");
             }
 
+            com.google.common.base.Preconditions.checkArgument(cpu == null || 
cpu > 0, "The cpu allocation for the sink must be positive");
+            com.google.common.base.Preconditions.checkArgument(ram == null || 
ram > 0, "The ram allocation for the sink must be positive");
+            com.google.common.base.Preconditions.checkArgument(disk == null || 
disk > 0, "The disk allocation for the sink must be positive");
+            sinkConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
+
             if (null != sinkConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, Object> sinkConfigMap = new 
Gson().fromJson(sinkConfigString, type);
@@ -290,6 +302,20 @@ public class CmdSinks extends CmdBase {
             sinkSpecBuilder.setConfigs(new 
Gson().toJson(sinkConfig.getConfigs()));
             sinkSpecBuilder.setTypeClassName(typeArg.getName());
             functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+            if (sinkConfig.getResources() != null) {
+                Resources.Builder bldr = Resources.newBuilder();
+                if (sinkConfig.getResources().getCpu() != null) {
+                    bldr.setCpu(sinkConfig.getResources().getCpu());
+                }
+                if (sinkConfig.getResources().getRam() != null) {
+                    bldr.setRam(sinkConfig.getResources().getRam());
+                }
+                if (sinkConfig.getResources().getDisk() != null) {
+                    bldr.setDisk(sinkConfig.getResources().getDisk());
+                }
+                functionDetailsBuilder.setResources(bldr.build());
+            }
             return functionDetailsBuilder.build();
         }
     }
@@ -355,4 +381,4 @@ public class CmdSinks extends CmdBase {
         }
         throw new RuntimeException("Unrecognized processing guarantee: " + 
processingGuarantees.name());
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 79dbb50..267673c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.shaded.proto.Function;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.Resources;
 import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
@@ -133,6 +134,12 @@ public class CmdSources extends CmdBase {
         @Parameter(names = "--sourceConfigFile", description = "The path to a 
YAML config file specifying the "
                 + "source's configuration")
         protected String sourceConfigFile;
+        @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
+        protected Double cpu;
+        @Parameter(names = "--ram", description = "The ram in bytes that need 
to be allocated per function instance(applicable only to process/docker 
runtime)")
+        protected Long ram;
+        @Parameter(names = "--disk", description = "The disk in bytes that 
need to be allocated per function instance(applicable only to docker runtime)")
+        protected Long disk;
         @Parameter(names = "--sourceConfig", description = "Source config 
key/values")
         protected String sourceConfigString;
 
@@ -187,6 +194,11 @@ public class CmdSources extends CmdBase {
                 throw new IllegalArgumentException("Connector JAR not 
specfied");
             }
 
+            com.google.common.base.Preconditions.checkArgument(cpu == null || 
cpu > 0, "The cpu allocation for the source must be positive");
+            com.google.common.base.Preconditions.checkArgument(ram == null || 
ram > 0, "The ram allocation for the source must be positive");
+            com.google.common.base.Preconditions.checkArgument(disk == null || 
disk > 0, "The disk allocation for the source must be positive");
+            sourceConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
+
             if (null != sourceConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, Object> sourceConfigMap = new 
Gson().fromJson(sourceConfigString, type);
@@ -277,6 +289,21 @@ public class CmdSources extends CmdBase {
             sinkSpecBuilder.setTypeClassName(typeArg.getName());
 
             functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+            if (sourceConfig.getResources() != null) {
+                Resources.Builder bldr = Resources.newBuilder();
+                if (sourceConfig.getResources().getCpu() != null) {
+                    bldr.setCpu(sourceConfig.getResources().getCpu());
+                }
+                if (sourceConfig.getResources().getRam() != null) {
+                    bldr.setRam(sourceConfig.getResources().getRam());
+                }
+                if (sourceConfig.getResources().getDisk() != null) {
+                    bldr.setDisk(sourceConfig.getResources().getDisk());
+                }
+                functionDetailsBuilder.setResources(bldr.build());
+            }
+
             return functionDetailsBuilder.build();
         }
     }
@@ -337,4 +364,4 @@ public class CmdSources extends CmdBase {
         }
         throw new RuntimeException("Unrecognized processing guarantee: " + 
processingGuarantees.name());
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index e73a5bb..4f44f9b 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -34,6 +34,12 @@ enum SubscriptionType {
     FAILOVER = 1;
 }
 
+message Resources {
+    double cpu = 1;
+    int64 ram = 2;
+    int64 disk = 3;
+}
+
 message FunctionDetails {
     enum Runtime {
         JAVA = 0;
@@ -51,6 +57,7 @@ message FunctionDetails {
     int32 parallelism = 10;
     SourceSpec source = 11;
     SinkSpec sink = 12;
+    Resources resources = 13;
 }
 
 message SourceSpec {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 5fba569..568c51f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -81,6 +81,12 @@ class ProcessRuntime implements Runtime {
             args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
             args.add("-Dpulsar.log.dir=" + logDirectory);
             args.add("-Dpulsar.log.file=" + 
instanceConfig.getFunctionDetails().getName());
+            if (instanceConfig.getFunctionDetails().getResources() != null) {
+                Function.Resources resources = 
instanceConfig.getFunctionDetails().getResources();
+                if (resources.getRam() != 0) {
+                    args.add("-Xmx" + String.valueOf(resources.getRam()));
+                }
+            }
             args.add(JavaInstanceMain.class.getName());
             args.add("--jar");
             args.add(codeFile);
@@ -93,6 +99,7 @@ class ProcessRuntime implements Runtime {
             args.add(logDirectory);
             args.add("--logging_file");
             args.add(instanceConfig.getFunctionDetails().getName());
+            // TODO:- Find a platform independent way of controlling memory 
for a python application
         }
         args.add("--instance_id");
         args.add(instanceConfig.getInstanceId());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index cb7d0a2..5baecfb 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -79,5 +79,6 @@ public class FunctionConfig {
     private Runtime runtime;
     private boolean autoAck;
     private int parallelism;
+    private Resources resources;
     private String fqfn;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
similarity index 66%
copy from 
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
copy to 
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
index bfc9a43..2a1ac8e 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
@@ -18,11 +18,7 @@
  */
 package org.apache.pulsar.functions.utils;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -32,13 +28,9 @@ import java.util.Map;
 @Data
 @EqualsAndHashCode
 @ToString
-public class SinkConfig {
-    private String tenant;
-    private String namespace;
-    private String name;
-    private String className;
-    private Map<String, String> topicToSerdeClassName;
-    private Map<String, Object> configs = new HashMap<>();
-    private int parallelism = 1;
-    private FunctionConfig.ProcessingGuarantees processingGuarantees;
+@AllArgsConstructor
+public class Resources {
+    private Double cpu;
+    private Long ram;
+    private Long disk;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index bfc9a43..3f838cf 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -41,4 +41,5 @@ public class SinkConfig {
     private Map<String, Object> configs = new HashMap<>();
     private int parallelism = 1;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
+    private Resources resources;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index faf1f95..89e3c80 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -42,4 +42,5 @@ public class SourceConfig {
     private Map<String, Object> configs = new HashMap<>();
     private int parallelism = 1;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
+    private Resources resources;
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to