This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3a66808 Added ability to specify compression while publishing (#2781) 3a66808 is described below commit 3a668084f74b9d4467ae22f38502ee1b9e3c76e9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Oct 12 06:03:29 2018 -0700 Added ability to specify compression while publishing (#2781) --- pulsar-client-cpp/python/pulsar/functions/context.py | 2 +- pulsar-functions/instance/src/main/python/contextimpl.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py index 2510bad..47e86f9 100644 --- a/pulsar-client-cpp/python/pulsar/functions/context.py +++ b/pulsar-client-cpp/python/pulsar/functions/context.py @@ -104,7 +104,7 @@ class Context(object): pass @abstractmethod - def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None): + def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None): """Publishes message to topic_name by first serializing the message using serde_class_name serde The message will have properties specified if any""" pass diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index b855ec0..a02724f 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -118,17 +118,21 @@ class ContextImpl(pulsar.Context): def get_output_serde_class_name(self): return self.instance_config.function_details.outputSerdeClassName - def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None): + def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None): # Just make sure that user supplied values are properly typed topic_name = str(topic_name) serde_class_name = str(serde_class_name) + pulsar_compression_type = pulsar._pulsar.CompressionType.NONE + if compression_type is not None: + pulsar_compression_type = compression_type if topic_name not in self.publish_producers: self.publish_producers[topic_name] = self.pulsar_client.create_producer( topic_name, block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=1, - max_pending_messages=100000 + max_pending_messages=100000, + compression_type=compression_type ) if serde_class_name not in self.publish_serializers: @@ -164,4 +168,4 @@ class ContextImpl(pulsar.Context): m.max = accumulated_metric.max m.min = accumulated_metric.min metrics.metrics[metric_name] = m - return metrics \ No newline at end of file + return metrics