This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 591df49  Fix IdentitySerde (#2762)
591df49 is described below

commit 591df495f5a247a05e99b6c9f342c58b700e2375
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Wed Oct 10 17:05:15 2018 -0700

    Fix IdentitySerde (#2762)
    
    * Apply bytes only if the output of serializer is string
    
    * Change IdentitySerde
    
    * Fixed deser logic
    
    * Changed the name of IdentitySerDe to StringSerDe to better reflect what 
it is
    
    * Change doc comments
    
    * Reverted all changes
    
    * Made IdentitySerde mimic Json Schema
    
    * Reverted any changes
    
    * IdentitySerde will only work with primitive types
---
 pulsar-client-cpp/python/pulsar/functions/serde.py       | 16 +++++++++++++---
 .../instance/src/main/python/python_instance.py          |  2 +-
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py 
b/pulsar-client-cpp/python/pulsar/functions/serde.py
index c31cb1e..ee6366c 100644
--- a/pulsar-client-cpp/python/pulsar/functions/serde.py
+++ b/pulsar-client-cpp/python/pulsar/functions/serde.py
@@ -68,9 +68,19 @@ class PickleSerDe(SerDe):
       return pickle.loads(input_bytes)
 
 class IdentitySerDe(SerDe):
-  """Pickle based serializer"""
+  """Simple Serde that just conversion to string and back"""
+  def __init__(self):
+    self._types = [int, float, complex, str]
+
   def serialize(self, input):
-    return input
+    if type(input) in self._types:
+      return str(input).encode('utf-8')
+    raise TypeError
 
   def deserialize(self, input_bytes):
-    return input_bytes
\ No newline at end of file
+    for typ in self._types:
+      try:
+        return typ(input_bytes.decode('utf-8'))
+      except:
+        pass
+    raise TypeError
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 5a1bff5..03dafae 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -294,7 +294,7 @@ class PythonInstance(object):
       if self.producer is None:
         self.setup_producer()
       try:
-        output_bytes = bytes(self.output_serde.serialize(output))
+        output_bytes = self.output_serde.serialize(output)
       except:
         self.current_stats.nserialization_exceptions += 1
         self.total_stats.nserialization_exceptions += 1

Reply via email to