This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 591df49 Fix IdentitySerde (#2762) 591df49 is described below commit 591df495f5a247a05e99b6c9f342c58b700e2375 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Oct 10 17:05:15 2018 -0700 Fix IdentitySerde (#2762) * Apply bytes only if the output of serializer is string * Change IdentitySerde * Fixed deser logic * Changed the name of IdentitySerDe to StringSerDe to better reflect what it is * Change doc comments * Reverted all changes * Made IdentitySerde mimic Json Schema * Reverted any changes * IdentitySerde will only work with primitive types --- pulsar-client-cpp/python/pulsar/functions/serde.py | 16 +++++++++++++--- .../instance/src/main/python/python_instance.py | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py b/pulsar-client-cpp/python/pulsar/functions/serde.py index c31cb1e..ee6366c 100644 --- a/pulsar-client-cpp/python/pulsar/functions/serde.py +++ b/pulsar-client-cpp/python/pulsar/functions/serde.py @@ -68,9 +68,19 @@ class PickleSerDe(SerDe): return pickle.loads(input_bytes) class IdentitySerDe(SerDe): - """Pickle based serializer""" + """Simple Serde that just conversion to string and back""" + def __init__(self): + self._types = [int, float, complex, str] + def serialize(self, input): - return input + if type(input) in self._types: + return str(input).encode('utf-8') + raise TypeError def deserialize(self, input_bytes): - return input_bytes \ No newline at end of file + for typ in self._types: + try: + return typ(input_bytes.decode('utf-8')) + except: + pass + raise TypeError diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 5a1bff5..03dafae 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -294,7 +294,7 @@ class PythonInstance(object): if self.producer is None: self.setup_producer() try: - output_bytes = bytes(self.output_serde.serialize(output)) + output_bytes = self.output_serde.serialize(output) except: self.current_stats.nserialization_exceptions += 1 self.total_stats.nserialization_exceptions += 1