This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a6d195  add flatbuffer option to serialize kinesis-message in 
KinesisSink (#2108)
2a6d195 is described below

commit 2a6d1950954e9d0b1ec33c7fc738ebbdab0a4b9e
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Jul 12 16:59:00 2018 -0700

    add flatbuffer option to serialize kinesis-message in KinesisSink (#2108)
    
    * add flatbuffer option to serialize kinesis-message
    
    * add flatbuffer license
    
    * fix license
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |  12 ++
 .../pulsar/common/api/EncryptionContext.java       |   1 -
 pulsar-io/kinesis/pom.xml                          |   6 +
 .../kinesis/src/main/fb/KinesisMessageApi.fbs      |  59 +++++++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |  10 +-
 .../pulsar/io/kinesis/KinesisSinkConfig.java       |   6 +-
 .../java/org/apache/pulsar/io/kinesis/Utils.java   | 129 ++++++++++++++--
 .../pulsar/io/kinesis/fbs/CompressionType.java     |  15 ++
 .../pulsar/io/kinesis/fbs/EncryptionCtx.java       |  68 +++++++++
 .../pulsar/io/kinesis/fbs/EncryptionKey.java       |  52 +++++++
 .../org/apache/pulsar/io/kinesis/fbs/KeyValue.java |  41 +++++
 .../org/apache/pulsar/io/kinesis/fbs/Message.java  |  53 +++++++
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    | 170 ++++++++++++++++-----
 14 files changed, 572 insertions(+), 52 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 763f62c..33b9748 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -459,6 +459,8 @@ The Apache Software License, Version 2.0
     - org.inferred-freebuilder-1.14.9.jar
   * Snappy Java
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
+  * Flatbuffers Java
+    - com.google.flatbuffers-flatbuffers-java-1.9.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git a/pom.xml b/pom.xml
index ba705a7..8a0da98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -946,6 +946,11 @@ flexible messaging model and an intuitive client 
API.</description>
             
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>
             <exclude>**/ByteBufCodedInputStream.java</exclude>
             <exclude>**/ByteBufCodedOutputStream.java</exclude>
             <exclude>bin/proto/*</exclude>
@@ -1050,6 +1055,13 @@ flexible messaging model and an intuitive client 
API.</description>
             
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>bin/proto/MLDataFormats_pb2.py</exclude>
+            
+            <!-- pulasr-io-connector kinesis : auto generated files from 
flatbuffer schema -->
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>
 
             <!-- These files are BSD licensed files -->
             
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
index 98eaad7..ff359c5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
@@ -34,7 +34,6 @@ public class EncryptionContext {
 
     private Map<String, EncryptionKey> keys;
     private byte[] param;
-    private Map<String, String> metadata;
     private String algorithm;
     private CompressionType compressionType;
     private int uncompressedMessageSize;
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 08c3004..f0c2776 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -81,6 +81,12 @@
       <version>0.12.8</version>
     </dependency>
        <!-- /kinesis dependencies -->
+       
+    <dependency>
+      <groupId>com.google.flatbuffers</groupId>
+      <artifactId>flatbuffers-java</artifactId>
+      <version>1.9.0</version>
+    </dependency>
 
   </dependencies>
 
diff --git a/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs 
b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs
new file mode 100644
index 0000000..f7cc030
--- /dev/null
+++ b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+ /**
+ * Instrunction to generate fb-source
+ * export PULSAR_HOME=<Path where you cloned the pulsar repo>
+ * export KINESIS_IO_MAIN=${PULSAR_HOME}/pulsar-io/kinesis/src/main
+ * Command to build java src:  flatc --java  -o ${KINESIS_IO_MAIN}/java 
${KINESIS_IO_MAIN}/fb/KinesisMessageApi.fbs 
+ * flatc version 1.9.0 (pip install flatbuffers)
+ */
+
+namespace org.apache.pulsar.io.kinesis.fbs;
+
+table KeyValue {
+  key:string;
+  value:string;
+}
+
+table EncryptionKey {
+  key:string;
+  value:[byte];
+  metadata:[KeyValue];
+}
+
+enum CompressionType : byte { NONE, LZ4, ZLIB }
+
+table EncryptionCtx {
+  keys:[EncryptionKey];
+  param:[byte];
+  algo:string;
+  compressionType:CompressionType;
+  uncompressedMessageSize:int;
+  batchSize:int;
+  isBatchMessage:bool=false;
+}
+
+table Message {
+  encryptionCtx:EncryptionCtx;
+  properties:[KeyValue];
+  payload:[byte];
+}
+
+root_type Message;
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 5387a66..dbe1e75 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -103,7 +103,7 @@ public class KinesisSink implements Sink<byte[]> {
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey,
-                
ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value)));
+                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value));
         addCallback(addRecordResult,
                 ProducerSendCallback.create(this.streamName, 
inputRecordContext, System.nanoTime()), directExecutor());
         if (LOG.isDebugEnabled()) {
@@ -274,12 +274,14 @@ public class KinesisSink implements Sink<byte[]> {
         };
     }
 
-    public static byte[] createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
+    public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
         if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
-            return Utils.serializeRecordToJson(recordCtx, data).getBytes();
+            return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx, 
data).getBytes());
+        } else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
+            return Utils.serializeRecordToFlatBuffer(recordCtx, data);
         } else {
             // send raw-message
-            return data;
+            return ByteBuffer.wrap(data);
         }
     }
     
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index e3c8fde..bf5f2ea 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -76,6 +76,10 @@ public class KinesisSinkConfig implements Serializable {
          * 
          * 
          */
-        FULL_MESSAGE_IN_JSON;
+        FULL_MESSAGE_IN_JSON,
+        /**
+         * Kinesis sink sends message serialized in flat-buffer.
+         */
+        FULL_MESSAGE_IN_FB;
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 8236080..469151e 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -19,13 +19,22 @@
 
 package org.apache.pulsar.io.kinesis;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Base64.getEncoder;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
+import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
+import org.apache.pulsar.io.kinesis.fbs.KeyValue;
+import org.apache.pulsar.io.kinesis.fbs.Message;
 
+import com.google.flatbuffers.FlatBufferBuilder;
 import com.google.gson.JsonObject;
 
 public class Utils {
@@ -34,13 +43,121 @@ public class Utils {
     private static final String PROPERTIES_FIELD = "properties";
     private static final String KEY_MAP_FIELD = "keysMapBase64";
     private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap";
-    private static final String METADATA_FIELD = "metadata";
     private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64";
     private static final String ALGO_FIELD = "algorithm";
     private static final String COMPRESSION_TYPE_FIELD = "compressionType";
     private static final String UNCPRESSED_MSG_SIZE_FIELD = 
"uncompressedMessageSize";
     private static final String BATCH_SIZE_FIELD = "batchSize";
     private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
+    
+    private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new 
FlatBufferBuilder(0);
+
+    /**
+     * Serialize record to flat-buffer. it's not a thread-safe method.
+     * 
+     * @param inputRecordContext
+     * @param data
+     * @return
+     */
+    public static ByteBuffer serializeRecordToFlatBuffer(RecordContext 
inputRecordContext, byte[] data) {
+        DEFAULT_FB_BUILDER.clear();
+        return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, 
inputRecordContext, data);
+    }
+    
+    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder 
builder, RecordContext inputRecordContext, byte[] data) {
+        checkNotNull(inputRecordContext, "record-context can't be null");
+        Optional<EncryptionContext> encryptionCtx = 
inputRecordContext.getEncryptionCtx();
+        Map<String, String> properties = inputRecordContext.getProperties();
+
+        int encryptionCtxOffset = -1;
+        int propertiesOffset = -1;
+
+        if (properties != null && !properties.isEmpty()) {
+            int[] propertiesOffsetArray = new int[properties.size()];
+            int i = 0;
+            for (Entry<String, String> property : properties.entrySet()) {
+                propertiesOffsetArray[i++] = KeyValue.createKeyValue(builder, 
builder.createString(property.getKey()),
+                        builder.createString(property.getValue()));
+            }
+            propertiesOffset = Message.createPropertiesVector(builder, 
propertiesOffsetArray);
+        }
+
+        if (encryptionCtx.isPresent()) {
+            encryptionCtxOffset = createEncryptionCtxOffset(builder, 
encryptionCtx);
+        }
+
+        int payloadOffset = Message.createPayloadVector(builder, data);
+        Message.startMessage(builder);
+        Message.addPayload(builder, payloadOffset);
+        if (encryptionCtxOffset != -1) {
+            Message.addEncryptionCtx(builder, encryptionCtxOffset);
+        }
+        if (propertiesOffset != -1) {
+            Message.addProperties(builder, propertiesOffset);
+        }
+        int endMessage = Message.endMessage(builder);
+        builder.finish(endMessage);
+        ByteBuffer bb = builder.dataBuffer();
+        
+        // to avoid copying of data, use same byte[] wrapped by ByteBuffer. 
But, ByteBuffer.array() returns entire array
+        // so, it requires to read from offset:
+        // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, 
bb.capacity() - space)
+        int space = bb.capacity() - builder.offset(); 
+        return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
+    }
+
+    private static int createEncryptionCtxOffset(final FlatBufferBuilder 
builder, Optional<EncryptionContext> encryptionCtx) {
+        if (!encryptionCtx.isPresent()) {
+            return -1;
+        }
+        // Message.addEncryptionCtx(builder, encryptionCtxOffset);
+        EncryptionContext ctx = encryptionCtx.get();
+        int[] keysOffsets = new int[ctx.getKeys().size()];
+        int keyIndex = 0;
+        for (Entry<String, 
org.apache.pulsar.common.api.EncryptionContext.EncryptionKey> entry : 
ctx.getKeys()
+                .entrySet()) {
+            int key = builder.createString(entry.getKey());
+            int value = EncryptionKey.createValueVector(builder, 
entry.getValue().getKeyValue());
+            Map<String, String> metadata = entry.getValue().getMetadata();
+            int[] metadataOffsets = new int[metadata.size()];
+            int i = 0;
+            for (Entry<String, String> m : metadata.entrySet()) {
+                metadataOffsets[i++] = KeyValue.createKeyValue(builder, 
builder.createString(m.getKey()),
+                        builder.createString(m.getValue()));
+            }
+            int metadataOffset = -1;
+            if (metadata.size() > 0) {
+                metadataOffset = EncryptionKey.createMetadataVector(builder, 
metadataOffsets);
+            }
+            EncryptionKey.startEncryptionKey(builder);
+            EncryptionKey.addKey(builder, key);
+            EncryptionKey.addValue(builder, value);
+            if(metadataOffset!=-1) {
+                EncryptionKey.addMetadata(builder, metadataOffset);            
    
+            }
+            keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
+        }
+
+        int keysOffset = EncryptionCtx.createKeysVector(builder, keysOffsets);
+        int param = EncryptionCtx.createParamVector(builder, ctx.getParam());
+        int algo = builder.createString(ctx.getAlgorithm());
+        int batchSize = ctx.getBatchSize().isPresent() ? 
ctx.getBatchSize().get() : 1;
+        byte compressionType;
+        switch (ctx.getCompressionType()) {
+        case LZ4:
+            compressionType = 
org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4;
+            break;
+        case ZLIB:
+            compressionType = 
org.apache.pulsar.io.kinesis.fbs.CompressionType.ZLIB;
+            break;
+        default:
+            compressionType = 
org.apache.pulsar.io.kinesis.fbs.CompressionType.NONE;
+
+        }
+        return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, 
algo, compressionType,
+                ctx.getUncompressedMessageSize(), batchSize, 
ctx.getBatchSize().isPresent());
+    
+    }
 
     /**
      * Serializes sink-record into json format. It encodes encryption-keys, 
encryption-param and payload in base64
@@ -51,9 +168,7 @@ public class Utils {
      * @return
      */
     public static String serializeRecordToJson(RecordContext 
inputRecordContext, byte[] data) {
-        if (inputRecordContext == null) {
-            return null;
-        }
+        checkNotNull(inputRecordContext, "record-context can't be null");
         JsonObject result = new JsonObject();
         result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
         if (inputRecordContext.getProperties() != null) {
@@ -79,12 +194,6 @@ public class Utils {
             });
             encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
             encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
-            Map<String, String> metadataMap = encryptionCtx.getMetadata();
-            if (metadataMap != null && !metadataMap.isEmpty()) {
-                JsonObject metadata = new JsonObject();
-                encryptionCtx.getMetadata().entrySet().forEach(m -> 
metadata.addProperty(m.getKey(), m.getValue()));
-                encryptionCtxJson.add(METADATA_FIELD, metadata);
-            }
             encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
                     getEncoder().encodeToString(encryptionCtx.getParam()));
             encryptionCtxJson.addProperty(ALGO_FIELD, 
encryptionCtx.getAlgorithm());
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java
new file mode 100644
index 0000000..0a90a22
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java
@@ -0,0 +1,15 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+public final class CompressionType {
+  private CompressionType() { }
+  public static final byte NONE = 0;
+  public static final byte LZ4 = 1;
+  public static final byte ZLIB = 2;
+
+  public static final String[] names = { "NONE", "LZ4", "ZLIB", };
+
+  public static String name(int e) { return names[e]; }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java
new file mode 100644
index 0000000..e6dff72
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java
@@ -0,0 +1,68 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class EncryptionCtx extends Table {
+  public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb) { return 
getRootAsEncryptionCtx(_bb, new EncryptionCtx()); }
+  public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb, 
EncryptionCtx obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public EncryptionCtx __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); 
return this; }
+
+  public EncryptionKey keys(int j) { return keys(new EncryptionKey(), j); }
+  public EncryptionKey keys(EncryptionKey obj, int j) { int o = __offset(4); 
return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
+  public int keysLength() { int o = __offset(4); return o != 0 ? 
__vector_len(o) : 0; }
+  public byte param(int j) { int o = __offset(6); return o != 0 ? 
bb.get(__vector(o) + j * 1) : 0; }
+  public int paramLength() { int o = __offset(6); return o != 0 ? 
__vector_len(o) : 0; }
+  public ByteBuffer paramAsByteBuffer() { return __vector_as_bytebuffer(6, 1); 
}
+  public ByteBuffer paramInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 6, 1); }
+  public String algo() { int o = __offset(8); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer algoAsByteBuffer() { return __vector_as_bytebuffer(8, 1); }
+  public ByteBuffer algoInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 8, 1); }
+  public byte compressionType() { int o = __offset(10); return o != 0 ? 
bb.get(o + bb_pos) : 0; }
+  public int uncompressedMessageSize() { int o = __offset(12); return o != 0 ? 
bb.getInt(o + bb_pos) : 0; }
+  public int batchSize() { int o = __offset(14); return o != 0 ? bb.getInt(o + 
bb_pos) : 0; }
+  public boolean isBatchMessage() { int o = __offset(16); return o != 0 ? 
0!=bb.get(o + bb_pos) : false; }
+
+  public static int createEncryptionCtx(FlatBufferBuilder builder,
+      int keysOffset,
+      int paramOffset,
+      int algoOffset,
+      byte compressionType,
+      int uncompressedMessageSize,
+      int batchSize,
+      boolean isBatchMessage) {
+    builder.startObject(7);
+    EncryptionCtx.addBatchSize(builder, batchSize);
+    EncryptionCtx.addUncompressedMessageSize(builder, uncompressedMessageSize);
+    EncryptionCtx.addAlgo(builder, algoOffset);
+    EncryptionCtx.addParam(builder, paramOffset);
+    EncryptionCtx.addKeys(builder, keysOffset);
+    EncryptionCtx.addIsBatchMessage(builder, isBatchMessage);
+    EncryptionCtx.addCompressionType(builder, compressionType);
+    return EncryptionCtx.endEncryptionCtx(builder);
+  }
+
+  public static void startEncryptionCtx(FlatBufferBuilder builder) { 
builder.startObject(7); }
+  public static void addKeys(FlatBufferBuilder builder, int keysOffset) { 
builder.addOffset(0, keysOffset, 0); }
+  public static int createKeysVector(FlatBufferBuilder builder, int[] data) { 
builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; 
i--) builder.addOffset(data[i]); return builder.endVector(); }
+  public static void startKeysVector(FlatBufferBuilder builder, int numElems) 
{ builder.startVector(4, numElems, 4); }
+  public static void addParam(FlatBufferBuilder builder, int paramOffset) { 
builder.addOffset(1, paramOffset, 0); }
+  public static int createParamVector(FlatBufferBuilder builder, byte[] data) 
{ builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; 
i--) builder.addByte(data[i]); return builder.endVector(); }
+  public static void startParamVector(FlatBufferBuilder builder, int numElems) 
{ builder.startVector(1, numElems, 1); }
+  public static void addAlgo(FlatBufferBuilder builder, int algoOffset) { 
builder.addOffset(2, algoOffset, 0); }
+  public static void addCompressionType(FlatBufferBuilder builder, byte 
compressionType) { builder.addByte(3, compressionType, 0); }
+  public static void addUncompressedMessageSize(FlatBufferBuilder builder, int 
uncompressedMessageSize) { builder.addInt(4, uncompressedMessageSize, 0); }
+  public static void addBatchSize(FlatBufferBuilder builder, int batchSize) { 
builder.addInt(5, batchSize, 0); }
+  public static void addIsBatchMessage(FlatBufferBuilder builder, boolean 
isBatchMessage) { builder.addBoolean(6, isBatchMessage, false); }
+  public static int endEncryptionCtx(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java
new file mode 100644
index 0000000..44d74f4
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java
@@ -0,0 +1,52 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class EncryptionKey extends Table {
+  public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb) { return 
getRootAsEncryptionKey(_bb, new EncryptionKey()); }
+  public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb, 
EncryptionKey obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public EncryptionKey __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); 
return this; }
+
+  public String key() { int o = __offset(4); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
+  public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 4, 1); }
+  public byte value(int j) { int o = __offset(6); return o != 0 ? 
bb.get(__vector(o) + j * 1) : 0; }
+  public int valueLength() { int o = __offset(6); return o != 0 ? 
__vector_len(o) : 0; }
+  public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); 
}
+  public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 6, 1); }
+  public KeyValue metadata(int j) { return metadata(new KeyValue(), j); }
+  public KeyValue metadata(KeyValue obj, int j) { int o = __offset(8); return 
o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
+  public int metadataLength() { int o = __offset(8); return o != 0 ? 
__vector_len(o) : 0; }
+
+  public static int createEncryptionKey(FlatBufferBuilder builder,
+      int keyOffset,
+      int valueOffset,
+      int metadataOffset) {
+    builder.startObject(3);
+    EncryptionKey.addMetadata(builder, metadataOffset);
+    EncryptionKey.addValue(builder, valueOffset);
+    EncryptionKey.addKey(builder, keyOffset);
+    return EncryptionKey.endEncryptionKey(builder);
+  }
+
+  public static void startEncryptionKey(FlatBufferBuilder builder) { 
builder.startObject(3); }
+  public static void addKey(FlatBufferBuilder builder, int keyOffset) { 
builder.addOffset(0, keyOffset, 0); }
+  public static void addValue(FlatBufferBuilder builder, int valueOffset) { 
builder.addOffset(1, valueOffset, 0); }
+  public static int createValueVector(FlatBufferBuilder builder, byte[] data) 
{ builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; 
i--) builder.addByte(data[i]); return builder.endVector(); }
+  public static void startValueVector(FlatBufferBuilder builder, int numElems) 
{ builder.startVector(1, numElems, 1); }
+  public static void addMetadata(FlatBufferBuilder builder, int 
metadataOffset) { builder.addOffset(2, metadataOffset, 0); }
+  public static int createMetadataVector(FlatBufferBuilder builder, int[] 
data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i 
>= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
+  public static void startMetadataVector(FlatBufferBuilder builder, int 
numElems) { builder.startVector(4, numElems, 4); }
+  public static int endEncryptionKey(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java
new file mode 100644
index 0000000..8cb53b4
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java
@@ -0,0 +1,41 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class KeyValue extends Table {
+  public static KeyValue getRootAsKeyValue(ByteBuffer _bb) { return 
getRootAsKeyValue(_bb, new KeyValue()); }
+  public static KeyValue getRootAsKeyValue(ByteBuffer _bb, KeyValue obj) { 
_bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public KeyValue __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return 
this; }
+
+  public String key() { int o = __offset(4); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
+  public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 4, 1); }
+  public String value() { int o = __offset(6); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); 
}
+  public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 6, 1); }
+
+  public static int createKeyValue(FlatBufferBuilder builder,
+      int keyOffset,
+      int valueOffset) {
+    builder.startObject(2);
+    KeyValue.addValue(builder, valueOffset);
+    KeyValue.addKey(builder, keyOffset);
+    return KeyValue.endKeyValue(builder);
+  }
+
+  public static void startKeyValue(FlatBufferBuilder builder) { 
builder.startObject(2); }
+  public static void addKey(FlatBufferBuilder builder, int keyOffset) { 
builder.addOffset(0, keyOffset, 0); }
+  public static void addValue(FlatBufferBuilder builder, int valueOffset) { 
builder.addOffset(1, valueOffset, 0); }
+  public static int endKeyValue(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java
new file mode 100644
index 0000000..d29171c
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java
@@ -0,0 +1,53 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class Message extends Table {
+  public static Message getRootAsMessage(ByteBuffer _bb) { return 
getRootAsMessage(_bb, new Message()); }
+  public static Message getRootAsMessage(ByteBuffer _bb, Message obj) { 
_bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public Message __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return 
this; }
+
+  public EncryptionCtx encryptionCtx() { return encryptionCtx(new 
EncryptionCtx()); }
+  public EncryptionCtx encryptionCtx(EncryptionCtx obj) { int o = __offset(4); 
return o != 0 ? obj.__assign(__indirect(o + bb_pos), bb) : null; }
+  public KeyValue properties(int j) { return properties(new KeyValue(), j); }
+  public KeyValue properties(KeyValue obj, int j) { int o = __offset(6); 
return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
+  public int propertiesLength() { int o = __offset(6); return o != 0 ? 
__vector_len(o) : 0; }
+  public byte payload(int j) { int o = __offset(8); return o != 0 ? 
bb.get(__vector(o) + j * 1) : 0; }
+  public int payloadLength() { int o = __offset(8); return o != 0 ? 
__vector_len(o) : 0; }
+  public ByteBuffer payloadAsByteBuffer() { return __vector_as_bytebuffer(8, 
1); }
+  public ByteBuffer payloadInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 8, 1); }
+
+  public static int createMessage(FlatBufferBuilder builder,
+      int encryptionCtxOffset,
+      int propertiesOffset,
+      int payloadOffset) {
+    builder.startObject(3);
+    Message.addPayload(builder, payloadOffset);
+    Message.addProperties(builder, propertiesOffset);
+    Message.addEncryptionCtx(builder, encryptionCtxOffset);
+    return Message.endMessage(builder);
+  }
+
+  public static void startMessage(FlatBufferBuilder builder) { 
builder.startObject(3); }
+  public static void addEncryptionCtx(FlatBufferBuilder builder, int 
encryptionCtxOffset) { builder.addOffset(0, encryptionCtxOffset, 0); }
+  public static void addProperties(FlatBufferBuilder builder, int 
propertiesOffset) { builder.addOffset(1, propertiesOffset, 0); }
+  public static int createPropertiesVector(FlatBufferBuilder builder, int[] 
data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i 
>= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
+  public static void startPropertiesVector(FlatBufferBuilder builder, int 
numElems) { builder.startVector(4, numElems, 4); }
+  public static void addPayload(FlatBufferBuilder builder, int payloadOffset) 
{ builder.addOffset(2, payloadOffset, 0); }
+  public static int createPayloadVector(FlatBufferBuilder builder, byte[] 
data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i 
>= 0; i--) builder.addByte(data[i]); return builder.endVector(); }
+  public static void startPayloadVector(FlatBufferBuilder builder, int 
numElems) { builder.startVector(1, numElems, 1); }
+  public static int endMessage(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+  public static void finishMessageBuffer(FlatBufferBuilder builder, int 
offset) { builder.finish(offset); }
+  public static void finishSizePrefixedMessageBuffer(FlatBufferBuilder 
builder, int offset) { builder.finishSizePrefixed(offset); }
+}
+
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index e3f1160..b9f5c8a 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -20,15 +20,20 @@ package org.apache.pulsar.io.kinesis;
 
 import static java.util.Base64.getDecoder;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.kinesis.fbs.KeyValue;
+import org.apache.pulsar.io.kinesis.fbs.Message;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
@@ -43,71 +48,166 @@ import lombok.ToString;
  */
 public class UtilsTest {
 
+    @DataProvider(name = "encryption")
+    public Object[][] encryptionProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @Test
     public void testJsonSerialization() throws Exception {
 
-        final String key1 = "key1";
-        final String key2 = "key2";
+        final String[] keyNames = { "key1", "key2" };
         final String key1Value = "test1";
         final String key2Value = "test2";
+        final byte[][] keyValues = { key1Value.getBytes(), 
key2Value.getBytes() };
         final String param = "param";
         final String algo = "algo";
+        int batchSize = 10;
+        int compressionMsgSize = 10;
 
-        // prepare encryption-ctx
-        EncryptionContext ctx = new EncryptionContext();
-        ctx.setAlgorithm(algo);
-        ctx.setBatchSize(Optional.of(10));
-        ctx.setCompressionType(CompressionType.LZ4);
-        ctx.setUncompressedMessageSize(10);
-        Map<String, EncryptionKey> keys = Maps.newHashMap();
-        EncryptionKey encKeyVal = new EncryptionKey();
-        encKeyVal.setKeyValue(key1Value.getBytes());
+        // serialize to json
+        byte[] data = "payload".getBytes();
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("prop1", "value");
         Map<String, String> metadata1 = Maps.newHashMap();
         metadata1.put("version", "v1");
         metadata1.put("ckms", "cmks-1");
-        encKeyVal.setMetadata(metadata1);
-        EncryptionKey encKeyVal2 = new EncryptionKey();
-        encKeyVal2.setKeyValue(key2Value.getBytes());
         Map<String, String> metadata2 = Maps.newHashMap();
         metadata2.put("version", "v2");
         metadata2.put("ckms", "cmks-2");
-        encKeyVal2.setMetadata(metadata2);
-        keys.put(key1, encKeyVal);
-        keys.put(key2, encKeyVal2);
-        ctx.setKeys(keys);
-        ctx.setMetadata(metadata1);
-        ctx.setParam(param.getBytes());
-
-        // serialize to json
-        byte[] data = "payload".getBytes();
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put("prop1", "value");
-        RecordContext recordCtx = new RecordContextImpl(properties, ctx);
+        RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1, metadata2,
+                batchSize, compressionMsgSize, properties, true);
         String json = Utils.serializeRecordToJson(recordCtx, data);
-        System.out.println(json);
 
         // deserialize from json and assert
         KinesisMessageResponse kinesisJsonResponse = 
deSerializeRecordFromJson(json);
         Assert.assertEquals(data, 
getDecoder().decode(kinesisJsonResponse.getPayloadBase64()));
         EncryptionCtx encryptionCtxDeser = 
kinesisJsonResponse.getEncryptionCtx();
-        Assert.assertEquals(key1Value.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1)));
-        Assert.assertEquals(key2Value.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2)));
+        Assert.assertEquals(key1Value.getBytes(),
+                
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[0])));
+        Assert.assertEquals(key2Value.getBytes(),
+                
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[1])));
         Assert.assertEquals(param.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getEncParamBase64()));
         Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm());
-        Assert.assertEquals(metadata1, 
encryptionCtxDeser.getKeysMetadataMap().get(key1));
-        Assert.assertEquals(metadata2, 
encryptionCtxDeser.getKeysMetadataMap().get(key2));
-        Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata());
+        Assert.assertEquals(metadata1, 
encryptionCtxDeser.getKeysMetadataMap().get(keyNames[0]));
+        Assert.assertEquals(metadata2, 
encryptionCtxDeser.getKeysMetadataMap().get(keyNames[1]));
         Assert.assertEquals(properties, kinesisJsonResponse.getProperties());
 
     }
 
+    @Test(dataProvider="encryption")
+    public void testFbSerialization(boolean isEncryption) throws Exception {
+
+        final String[] keyNames = { "key1", "key2" };
+        final String param = "param";
+        final String algo = "algo";
+        int batchSize = 10;
+        int compressionMsgSize = 10;
+
+        for (int k = 0; k < 5; k++) {
+            String payloadString = RandomStringUtils.random(142342 * k, 
String.valueOf(System.currentTimeMillis()));
+            final String key1Value = payloadString + "test1";
+            final String key2Value = payloadString + "test2";
+            final byte[][] keyValues = { key1Value.getBytes(), 
key2Value.getBytes() };
+            byte[] data = payloadString.getBytes();
+            Map<String, String> properties = Maps.newHashMap();
+            properties.put("prop1", payloadString);
+            Map<String, String> metadata1 = Maps.newHashMap();
+            metadata1.put("version", "v1");
+            metadata1.put("ckms", "cmks-1");
+            Map<String, String> metadata2 = Maps.newHashMap();
+            metadata2.put("version", "v2");
+            metadata2.put("ckms", "cmks-2");
+            RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1,
+                    metadata2, batchSize, compressionMsgSize, properties, 
isEncryption);
+            ByteBuffer flatBuffer = 
Utils.serializeRecordToFlatBuffer(recordCtx, data);
+
+            Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer);
+            byte[] fbPayloadBytes = new 
byte[kinesisJsonResponse.payloadLength()];
+            kinesisJsonResponse.payloadAsByteBuffer().get(fbPayloadBytes);
+            Assert.assertEquals(data, fbPayloadBytes);
+
+            if(isEncryption) {
+                org.apache.pulsar.io.kinesis.fbs.EncryptionCtx 
encryptionCtxDeser = kinesisJsonResponse.encryptionCtx();
+                byte compressionType = encryptionCtxDeser.compressionType();
+                int fbBatchSize = encryptionCtxDeser.batchSize();
+                boolean isBathcMessage = encryptionCtxDeser.isBatchMessage();
+                int fbCompressionMsgSize = 
encryptionCtxDeser.uncompressedMessageSize();
+                int totalKeys = encryptionCtxDeser.keysLength();
+                Map<String, Map<String, String>> fbKeyMetadataResult = 
Maps.newHashMap();
+                Map<String, byte[]> fbKeyValueResult = Maps.newHashMap();
+                for (int i = 0; i < encryptionCtxDeser.keysLength(); i++) {
+                    org.apache.pulsar.io.kinesis.fbs.EncryptionKey 
encryptionKey = encryptionCtxDeser.keys(i);
+                    String keyName = encryptionKey.key();
+                    byte[] keyValueBytes = new 
byte[encryptionKey.valueLength()];
+                    encryptionKey.valueAsByteBuffer().get(keyValueBytes);
+                    fbKeyValueResult.put(keyName, keyValueBytes);
+                    Map<String, String> fbMetadata = Maps.newHashMap();
+                    for (int j = 0; j < encryptionKey.metadataLength(); j++) {
+                        KeyValue encMtdata = encryptionKey.metadata(j);
+                        fbMetadata.put(encMtdata.key(), encMtdata.value());
+                    }
+                    fbKeyMetadataResult.put(keyName, fbMetadata);
+                }
+                byte[] paramBytes = new byte[encryptionCtxDeser.paramLength()];
+                encryptionCtxDeser.paramAsByteBuffer().get(paramBytes);
+
+                Assert.assertEquals(totalKeys, 2);
+                Assert.assertEquals(batchSize, fbBatchSize);
+                Assert.assertEquals(isBathcMessage, true);
+                Assert.assertEquals(compressionMsgSize, fbCompressionMsgSize);
+                Assert.assertEquals(keyValues[0], 
fbKeyValueResult.get(keyNames[0]));
+                Assert.assertEquals(keyValues[1], 
fbKeyValueResult.get(keyNames[1]));
+                Assert.assertEquals(metadata1, 
fbKeyMetadataResult.get(keyNames[0]));
+                Assert.assertEquals(metadata2, 
fbKeyMetadataResult.get(keyNames[1]));
+                Assert.assertEquals(compressionType, 
org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4);
+                Assert.assertEquals(param.getBytes(), paramBytes);
+                Assert.assertEquals(algo, encryptionCtxDeser.algo());
+            }
+            
+            Map<String, String> fbproperties = Maps.newHashMap();
+            for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) {
+                KeyValue property = kinesisJsonResponse.properties(i);
+                fbproperties.put(property.key(), property.value());
+            }
+            Assert.assertEquals(properties, fbproperties);
+
+        }
+    }
+
+    private RecordContext createRecordContext(String algo, String[] keyNames, 
byte[][] keyValues, byte[] param,
+            Map<String, String> metadata1, Map<String, String> metadata2, int 
batchSize, int compressionMsgSize,
+            Map<String, String> properties, boolean isEncryption) {
+        EncryptionContext ctx = null;
+        if(isEncryption) {
+            ctx = new EncryptionContext();
+            ctx.setAlgorithm(algo);
+            ctx.setBatchSize(Optional.of(batchSize));
+            ctx.setCompressionType(CompressionType.LZ4);
+            ctx.setUncompressedMessageSize(compressionMsgSize);
+            Map<String, EncryptionKey> keys = Maps.newHashMap();
+            EncryptionKey encKeyVal = new EncryptionKey();
+            encKeyVal.setKeyValue(keyValues[0]);
+
+            encKeyVal.setMetadata(metadata1);
+            EncryptionKey encKeyVal2 = new EncryptionKey();
+            encKeyVal2.setKeyValue(keyValues[1]);
+            encKeyVal2.setMetadata(metadata2);
+            keys.put(keyNames[0], encKeyVal);
+            keys.put(keyNames[1], encKeyVal2);
+            ctx.setKeys(keys);
+            ctx.setParam(param);
+        }
+        return new RecordContextImpl(properties, Optional.ofNullable(ctx)); 
+    }
+
     class RecordContextImpl implements RecordContext {
         Map<String, String> properties;
         Optional<EncryptionContext> ectx;
 
-        public RecordContextImpl(Map<String, String> properties, 
EncryptionContext ectx) {
+        public RecordContextImpl(Map<String, String> properties, 
Optional<EncryptionContext> ectx) {
             this.properties = properties;
-            this.ectx = Optional.of(ectx);
+            this.ectx = ectx;
         }
 
         public Map<String, String> getProperties() {
@@ -146,8 +246,6 @@ public class UtilsTest {
         private Map<String, String> keysMapBase64;
         // map of encryption-key metadata
         private Map<String, Map<String, String>> keysMetadataMap;
-        // encryption-ctx metadata
-        private Map<String, String> metadata;
         // encryption param which is base64 encoded
         private String encParamBase64;
         // encryption algorithm

Reply via email to