This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da8981c  Kinesis sink publish full json message (#2079)
da8981c is described below

commit da8981c7becab94443cdac41a9495654943570ef
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Jul 6 11:59:29 2018 -0700

    Kinesis sink publish full json message (#2079)
    
    * Kinesis sink publish full json message
    
    * fix pulsar typo
---
 pulsar-io/core/pom.xml                             |  13 ++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |  23 ++-
 .../pulsar/io/kinesis/KinesisSinkConfig.java       |  26 +++-
 .../java/org/apache/pulsar/io/kinesis/Utils.java   | 103 +++++++++++++
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    | 162 +++++++++++++++++++++
 5 files changed, 324 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 30da3d5..003e6d8 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -47,6 +47,19 @@
         </exclusion>
       </exclusions>
     </dependency>
+    
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>protobuf-shaded</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 5a43312..eaa2b91 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -35,6 +35,7 @@ import 
org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pulsar.io.core.RecordContext;
 import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,9 +67,17 @@ import io.netty.util.Recycler.Handle;
  *      which accepts json-map of credentials in awsCredentialPluginParam 
  *      eg: awsCredentialPluginParam = 
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
  * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link 
AwsCredentialProviderPlugin}
- * 
+ * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
+ *   a. ONLY_RAW_PAYLOAD:     publishes raw payload to stream
+ *   b. FULL_MESSAGE_IN_JSON: publish full message (encryptionCtx + properties 
+ payload) in json format
+ *   json-schema:
+ *   
{"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"
 [...]
+ *   Example:
+ *   
{"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
  * </pre>
  * 
+ * 
+ * 
  */
 public class KinesisSink implements Sink<byte[]> {
 
@@ -92,7 +101,8 @@ public class KinesisSink implements Sink<byte[]> {
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
-                partitionedKey, ByteBuffer.wrap(value));
+                partitionedKey,
+                
ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value)));
         addCallback(addRecordResult,
                 ProducerSendCallback.create(this.streamName, 
inputRecordContext, System.nanoTime()), directExecutor());
         if (LOG.isDebugEnabled()) {
@@ -263,4 +273,13 @@ public class KinesisSink implements Sink<byte[]> {
         };
     }
 
+    public static byte[] createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
+        if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
+            return Utils.serializeRecordToJson(recordCtx, data).getBytes();
+        } else {
+            // send raw-message
+            return data;
+        }
+    }
+    
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index b7dbad4..e3c8fde 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -44,6 +44,7 @@ public class KinesisSinkConfig implements Serializable {
     private String awsKinesisStreamName;
     private String awsCredentialPluginName;
     private String awsCredentialPluginParam;
+    private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // 
default : ONLY_RAW_PAYLOAD
 
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
@@ -53,5 +54,28 @@ public class KinesisSinkConfig implements Serializable {
     public static KinesisSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KinesisSinkConfig.class);
-    }    
+    }
+    
+    /**
+     * Message format in which kinesis-sink converts pulsar-message and 
publishes to kinesis stream.
+     *
+     */
+    public static enum MessageFormat {
+        /**
+         * Kinesis sink directly publishes pulsar-payload as a message into 
the kinesis-stream
+         */
+        ONLY_RAW_PAYLOAD,
+        /**
+         * Kinesis sink creates a json payload with message-payload, 
properties and encryptionCtx and publishes json
+         * payload to kinesis stream.
+         * 
+         * schema: 
+         * 
{"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLI
 [...]
+         * Example:
+         * 
{"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
+         * 
+         * 
+         */
+        FULL_MESSAGE_IN_JSON;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
new file mode 100644
index 0000000..8236080
--- /dev/null
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kinesis;
+
+import static java.util.Base64.getEncoder;
+
+import java.util.Map;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.io.core.RecordContext;
+
+import com.google.gson.JsonObject;
+
+public class Utils {
+
+    private static final String PAYLOAD_FIELD = "payloadBase64";
+    private static final String PROPERTIES_FIELD = "properties";
+    private static final String KEY_MAP_FIELD = "keysMapBase64";
+    private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap";
+    private static final String METADATA_FIELD = "metadata";
+    private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64";
+    private static final String ALGO_FIELD = "algorithm";
+    private static final String COMPRESSION_TYPE_FIELD = "compressionType";
+    private static final String UNCPRESSED_MSG_SIZE_FIELD = 
"uncompressedMessageSize";
+    private static final String BATCH_SIZE_FIELD = "batchSize";
+    private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
+
+    /**
+     * Serializes sink-record into json format. It encodes encryption-keys, 
encryption-param and payload in base64
+     * format so, it can be sent in json.
+     * 
+     * @param inputRecordContext
+     * @param data
+     * @return
+     */
+    public static String serializeRecordToJson(RecordContext 
inputRecordContext, byte[] data) {
+        if (inputRecordContext == null) {
+            return null;
+        }
+        JsonObject result = new JsonObject();
+        result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
+        if (inputRecordContext.getProperties() != null) {
+            JsonObject properties = new JsonObject();
+            inputRecordContext.getProperties().entrySet()
+                    .forEach(e -> properties.addProperty(e.getKey(), 
e.getValue()));
+            result.add(PROPERTIES_FIELD, properties);
+        }
+        if (inputRecordContext.getEncryptionCtx().isPresent()) {
+            EncryptionContext encryptionCtx = 
inputRecordContext.getEncryptionCtx().get();
+            JsonObject encryptionCtxJson = new JsonObject();
+            JsonObject keyBase64Map = new JsonObject();
+            JsonObject keyMetadataMap = new JsonObject();
+            encryptionCtx.getKeys().entrySet().forEach(entry -> {
+                keyBase64Map.addProperty(entry.getKey(), 
getEncoder().encodeToString(entry.getValue().getKeyValue()));
+                Map<String, String> keyMetadata = 
entry.getValue().getMetadata();
+                if (keyMetadata != null && !keyMetadata.isEmpty()) {
+                    JsonObject metadata = new JsonObject();
+                    entry.getValue().getMetadata().entrySet()
+                            .forEach(m -> metadata.addProperty(m.getKey(), 
m.getValue()));
+                    keyMetadataMap.add(entry.getKey(), metadata);
+                }
+            });
+            encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
+            encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
+            Map<String, String> metadataMap = encryptionCtx.getMetadata();
+            if (metadataMap != null && !metadataMap.isEmpty()) {
+                JsonObject metadata = new JsonObject();
+                encryptionCtx.getMetadata().entrySet().forEach(m -> 
metadata.addProperty(m.getKey(), m.getValue()));
+                encryptionCtxJson.add(METADATA_FIELD, metadata);
+            }
+            encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
+                    getEncoder().encodeToString(encryptionCtx.getParam()));
+            encryptionCtxJson.addProperty(ALGO_FIELD, 
encryptionCtx.getAlgorithm());
+            if (encryptionCtx.getCompressionType() != null) {
+                encryptionCtxJson.addProperty(COMPRESSION_TYPE_FIELD, 
encryptionCtx.getCompressionType().name());
+                encryptionCtxJson.addProperty(UNCPRESSED_MSG_SIZE_FIELD, 
encryptionCtx.getUncompressedMessageSize());
+            }
+            if (encryptionCtx.getBatchSize().isPresent()) {
+                encryptionCtxJson.addProperty(BATCH_SIZE_FIELD, 
encryptionCtx.getBatchSize().get());
+            }
+            result.add(ENCRYPTION_CTX_FIELD, encryptionCtxJson);
+        }
+        return result.toString();
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
new file mode 100644
index 0000000..e3f1160
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis;
+
+import static java.util.Base64.getDecoder;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+import org.apache.pulsar.io.core.RecordContext;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.google.gson.Gson;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Unit test of {@link UtilsTest}.
+ */
+public class UtilsTest {
+
+    @Test
+    public void testJsonSerialization() throws Exception {
+
+        final String key1 = "key1";
+        final String key2 = "key2";
+        final String key1Value = "test1";
+        final String key2Value = "test2";
+        final String param = "param";
+        final String algo = "algo";
+
+        // prepare encryption-ctx
+        EncryptionContext ctx = new EncryptionContext();
+        ctx.setAlgorithm(algo);
+        ctx.setBatchSize(Optional.of(10));
+        ctx.setCompressionType(CompressionType.LZ4);
+        ctx.setUncompressedMessageSize(10);
+        Map<String, EncryptionKey> keys = Maps.newHashMap();
+        EncryptionKey encKeyVal = new EncryptionKey();
+        encKeyVal.setKeyValue(key1Value.getBytes());
+        Map<String, String> metadata1 = Maps.newHashMap();
+        metadata1.put("version", "v1");
+        metadata1.put("ckms", "cmks-1");
+        encKeyVal.setMetadata(metadata1);
+        EncryptionKey encKeyVal2 = new EncryptionKey();
+        encKeyVal2.setKeyValue(key2Value.getBytes());
+        Map<String, String> metadata2 = Maps.newHashMap();
+        metadata2.put("version", "v2");
+        metadata2.put("ckms", "cmks-2");
+        encKeyVal2.setMetadata(metadata2);
+        keys.put(key1, encKeyVal);
+        keys.put(key2, encKeyVal2);
+        ctx.setKeys(keys);
+        ctx.setMetadata(metadata1);
+        ctx.setParam(param.getBytes());
+
+        // serialize to json
+        byte[] data = "payload".getBytes();
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("prop1", "value");
+        RecordContext recordCtx = new RecordContextImpl(properties, ctx);
+        String json = Utils.serializeRecordToJson(recordCtx, data);
+        System.out.println(json);
+
+        // deserialize from json and assert
+        KinesisMessageResponse kinesisJsonResponse = 
deSerializeRecordFromJson(json);
+        Assert.assertEquals(data, 
getDecoder().decode(kinesisJsonResponse.getPayloadBase64()));
+        EncryptionCtx encryptionCtxDeser = 
kinesisJsonResponse.getEncryptionCtx();
+        Assert.assertEquals(key1Value.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1)));
+        Assert.assertEquals(key2Value.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2)));
+        Assert.assertEquals(param.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getEncParamBase64()));
+        Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm());
+        Assert.assertEquals(metadata1, 
encryptionCtxDeser.getKeysMetadataMap().get(key1));
+        Assert.assertEquals(metadata2, 
encryptionCtxDeser.getKeysMetadataMap().get(key2));
+        Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata());
+        Assert.assertEquals(properties, kinesisJsonResponse.getProperties());
+
+    }
+
+    class RecordContextImpl implements RecordContext {
+        Map<String, String> properties;
+        Optional<EncryptionContext> ectx;
+
+        public RecordContextImpl(Map<String, String> properties, 
EncryptionContext ectx) {
+            this.properties = properties;
+            this.ectx = Optional.of(ectx);
+        }
+
+        public Map<String, String> getProperties() {
+            return properties;
+        }
+
+        public Optional<EncryptionContext> getEncryptionCtx() {
+            return ectx;
+        }
+    }
+
+    public static KinesisMessageResponse deSerializeRecordFromJson(String 
jsonRecord) {
+        if (StringUtils.isNotBlank(jsonRecord)) {
+            return new Gson().fromJson(jsonRecord, 
KinesisMessageResponse.class);
+        }
+        return null;
+    }
+
+    @ToString
+    @Setter
+    @Getter
+    public static class KinesisMessageResponse {
+        // Encryption-context if message has been encrypted
+        private EncryptionCtx encryptionCtx;
+        // user-properties
+        private Map<String, String> properties;
+        // base64 encoded payload
+        private String payloadBase64;
+    }
+
+    @ToString
+    @Setter
+    @Getter
+    public static class EncryptionCtx {
+        // map of encryption-key value. (key-value is base64 encoded)
+        private Map<String, String> keysMapBase64;
+        // map of encryption-key metadata
+        private Map<String, Map<String, String>> keysMetadataMap;
+        // encryption-ctx metadata
+        private Map<String, String> metadata;
+        // encryption param which is base64 encoded
+        private String encParamBase64;
+        // encryption algorithm
+        private String algorithm;
+        // compression type if message is compressed
+        private CompressionType compressionType;
+        private int uncompressedMessageSize;
+        // number of messages in the batch if msg is batched message
+        private Integer batchSize;
+    }
+
+}

Reply via email to