This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2a6d195 add flatbuffer option to serialize kinesis-message in KinesisSink (#2108) 2a6d195 is described below commit 2a6d1950954e9d0b1ec33c7fc738ebbdab0a4b9e Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Jul 12 16:59:00 2018 -0700 add flatbuffer option to serialize kinesis-message in KinesisSink (#2108) * add flatbuffer option to serialize kinesis-message * add flatbuffer license * fix license --- distribution/server/src/assemble/LICENSE.bin.txt | 2 + pom.xml | 12 ++ .../pulsar/common/api/EncryptionContext.java | 1 - pulsar-io/kinesis/pom.xml | 6 + .../kinesis/src/main/fb/KinesisMessageApi.fbs | 59 +++++++ .../org/apache/pulsar/io/kinesis/KinesisSink.java | 10 +- .../pulsar/io/kinesis/KinesisSinkConfig.java | 6 +- .../java/org/apache/pulsar/io/kinesis/Utils.java | 129 ++++++++++++++-- .../pulsar/io/kinesis/fbs/CompressionType.java | 15 ++ .../pulsar/io/kinesis/fbs/EncryptionCtx.java | 68 +++++++++ .../pulsar/io/kinesis/fbs/EncryptionKey.java | 52 +++++++ .../org/apache/pulsar/io/kinesis/fbs/KeyValue.java | 41 +++++ .../org/apache/pulsar/io/kinesis/fbs/Message.java | 53 +++++++ .../org/apache/pulsar/io/kinesis/UtilsTest.java | 170 ++++++++++++++++----- 14 files changed, 572 insertions(+), 52 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 763f62c..33b9748 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -459,6 +459,8 @@ The Apache Software License, Version 2.0 - org.inferred-freebuilder-1.14.9.jar * Snappy Java - org.xerial.snappy-snappy-java-1.1.1.3.jar + * Flatbuffers Java + - com.google.flatbuffers-flatbuffers-java-1.9.0.jar BSD 3-clause "New" or "Revised" License diff --git a/pom.xml b/pom.xml index ba705a7..8a0da98 100644 --- a/pom.xml +++ b/pom.xml @@ -946,6 +946,11 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude> <exclude>**/ByteBufCodedInputStream.java</exclude> <exclude>**/ByteBufCodedOutputStream.java</exclude> <exclude>bin/proto/*</exclude> @@ -1050,6 +1055,13 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>bin/proto/MLDataFormats_pb2.py</exclude> + + <!-- pulasr-io-connector kinesis : auto generated files from flatbuffer schema --> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude> <!-- These files are BSD licensed files --> <exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java index 98eaad7..ff359c5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java @@ -34,7 +34,6 @@ public class EncryptionContext { private Map<String, EncryptionKey> keys; private byte[] param; - private Map<String, String> metadata; private String algorithm; private CompressionType compressionType; private int uncompressedMessageSize; diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 08c3004..f0c2776 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -81,6 +81,12 @@ <version>0.12.8</version> </dependency> <!-- /kinesis dependencies --> + + <dependency> + <groupId>com.google.flatbuffers</groupId> + <artifactId>flatbuffers-java</artifactId> + <version>1.9.0</version> + </dependency> </dependencies> diff --git a/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs new file mode 100644 index 0000000..f7cc030 --- /dev/null +++ b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + /** + * Instrunction to generate fb-source + * export PULSAR_HOME=<Path where you cloned the pulsar repo> + * export KINESIS_IO_MAIN=${PULSAR_HOME}/pulsar-io/kinesis/src/main + * Command to build java src: flatc --java -o ${KINESIS_IO_MAIN}/java ${KINESIS_IO_MAIN}/fb/KinesisMessageApi.fbs + * flatc version 1.9.0 (pip install flatbuffers) + */ + +namespace org.apache.pulsar.io.kinesis.fbs; + +table KeyValue { + key:string; + value:string; +} + +table EncryptionKey { + key:string; + value:[byte]; + metadata:[KeyValue]; +} + +enum CompressionType : byte { NONE, LZ4, ZLIB } + +table EncryptionCtx { + keys:[EncryptionKey]; + param:[byte]; + algo:string; + compressionType:CompressionType; + uncompressedMessageSize:int; + batchSize:int; + isBatchMessage:bool=false; +} + +table Message { + encryptionCtx:EncryptionCtx; + properties:[KeyValue]; + payload:[byte]; +} + +root_type Message; \ No newline at end of file diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 5387a66..dbe1e75 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -103,7 +103,7 @@ public class KinesisSink implements Sink<byte[]> { : partitionedKey; // partitionedKey Length must be at least one, and at most 256 ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, partitionedKey, - ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), inputRecordContext, value))); + createKinesisMessage(kinesisSinkConfig.getMessageFormat(), inputRecordContext, value)); addCallback(addRecordResult, ProducerSendCallback.create(this.streamName, inputRecordContext, System.nanoTime()), directExecutor()); if (LOG.isDebugEnabled()) { @@ -274,12 +274,14 @@ public class KinesisSink implements Sink<byte[]> { }; } - public static byte[] createKinesisMessage(MessageFormat msgFormat, RecordContext recordCtx, byte[] data) { + public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, RecordContext recordCtx, byte[] data) { if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) { - return Utils.serializeRecordToJson(recordCtx, data).getBytes(); + return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx, data).getBytes()); + } else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) { + return Utils.serializeRecordToFlatBuffer(recordCtx, data); } else { // send raw-message - return data; + return ByteBuffer.wrap(data); } } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index e3c8fde..bf5f2ea 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -76,6 +76,10 @@ public class KinesisSinkConfig implements Serializable { * * */ - FULL_MESSAGE_IN_JSON; + FULL_MESSAGE_IN_JSON, + /** + * Kinesis sink sends message serialized in flat-buffer. + */ + FULL_MESSAGE_IN_FB; } } \ No newline at end of file diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java index 8236080..469151e 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java @@ -19,13 +19,22 @@ package org.apache.pulsar.io.kinesis; +import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Base64.getEncoder; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.io.core.RecordContext; +import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx; +import org.apache.pulsar.io.kinesis.fbs.EncryptionKey; +import org.apache.pulsar.io.kinesis.fbs.KeyValue; +import org.apache.pulsar.io.kinesis.fbs.Message; +import com.google.flatbuffers.FlatBufferBuilder; import com.google.gson.JsonObject; public class Utils { @@ -34,13 +43,121 @@ public class Utils { private static final String PROPERTIES_FIELD = "properties"; private static final String KEY_MAP_FIELD = "keysMapBase64"; private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap"; - private static final String METADATA_FIELD = "metadata"; private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64"; private static final String ALGO_FIELD = "algorithm"; private static final String COMPRESSION_TYPE_FIELD = "compressionType"; private static final String UNCPRESSED_MSG_SIZE_FIELD = "uncompressedMessageSize"; private static final String BATCH_SIZE_FIELD = "batchSize"; private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx"; + + private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new FlatBufferBuilder(0); + + /** + * Serialize record to flat-buffer. it's not a thread-safe method. + * + * @param inputRecordContext + * @param data + * @return + */ + public static ByteBuffer serializeRecordToFlatBuffer(RecordContext inputRecordContext, byte[] data) { + DEFAULT_FB_BUILDER.clear(); + return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, inputRecordContext, data); + } + + public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder builder, RecordContext inputRecordContext, byte[] data) { + checkNotNull(inputRecordContext, "record-context can't be null"); + Optional<EncryptionContext> encryptionCtx = inputRecordContext.getEncryptionCtx(); + Map<String, String> properties = inputRecordContext.getProperties(); + + int encryptionCtxOffset = -1; + int propertiesOffset = -1; + + if (properties != null && !properties.isEmpty()) { + int[] propertiesOffsetArray = new int[properties.size()]; + int i = 0; + for (Entry<String, String> property : properties.entrySet()) { + propertiesOffsetArray[i++] = KeyValue.createKeyValue(builder, builder.createString(property.getKey()), + builder.createString(property.getValue())); + } + propertiesOffset = Message.createPropertiesVector(builder, propertiesOffsetArray); + } + + if (encryptionCtx.isPresent()) { + encryptionCtxOffset = createEncryptionCtxOffset(builder, encryptionCtx); + } + + int payloadOffset = Message.createPayloadVector(builder, data); + Message.startMessage(builder); + Message.addPayload(builder, payloadOffset); + if (encryptionCtxOffset != -1) { + Message.addEncryptionCtx(builder, encryptionCtxOffset); + } + if (propertiesOffset != -1) { + Message.addProperties(builder, propertiesOffset); + } + int endMessage = Message.endMessage(builder); + builder.finish(endMessage); + ByteBuffer bb = builder.dataBuffer(); + + // to avoid copying of data, use same byte[] wrapped by ByteBuffer. But, ByteBuffer.array() returns entire array + // so, it requires to read from offset: + // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, bb.capacity() - space) + int space = bb.capacity() - builder.offset(); + return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space); + } + + private static int createEncryptionCtxOffset(final FlatBufferBuilder builder, Optional<EncryptionContext> encryptionCtx) { + if (!encryptionCtx.isPresent()) { + return -1; + } + // Message.addEncryptionCtx(builder, encryptionCtxOffset); + EncryptionContext ctx = encryptionCtx.get(); + int[] keysOffsets = new int[ctx.getKeys().size()]; + int keyIndex = 0; + for (Entry<String, org.apache.pulsar.common.api.EncryptionContext.EncryptionKey> entry : ctx.getKeys() + .entrySet()) { + int key = builder.createString(entry.getKey()); + int value = EncryptionKey.createValueVector(builder, entry.getValue().getKeyValue()); + Map<String, String> metadata = entry.getValue().getMetadata(); + int[] metadataOffsets = new int[metadata.size()]; + int i = 0; + for (Entry<String, String> m : metadata.entrySet()) { + metadataOffsets[i++] = KeyValue.createKeyValue(builder, builder.createString(m.getKey()), + builder.createString(m.getValue())); + } + int metadataOffset = -1; + if (metadata.size() > 0) { + metadataOffset = EncryptionKey.createMetadataVector(builder, metadataOffsets); + } + EncryptionKey.startEncryptionKey(builder); + EncryptionKey.addKey(builder, key); + EncryptionKey.addValue(builder, value); + if(metadataOffset!=-1) { + EncryptionKey.addMetadata(builder, metadataOffset); + } + keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder); + } + + int keysOffset = EncryptionCtx.createKeysVector(builder, keysOffsets); + int param = EncryptionCtx.createParamVector(builder, ctx.getParam()); + int algo = builder.createString(ctx.getAlgorithm()); + int batchSize = ctx.getBatchSize().isPresent() ? ctx.getBatchSize().get() : 1; + byte compressionType; + switch (ctx.getCompressionType()) { + case LZ4: + compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4; + break; + case ZLIB: + compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.ZLIB; + break; + default: + compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.NONE; + + } + return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, algo, compressionType, + ctx.getUncompressedMessageSize(), batchSize, ctx.getBatchSize().isPresent()); + + } /** * Serializes sink-record into json format. It encodes encryption-keys, encryption-param and payload in base64 @@ -51,9 +168,7 @@ public class Utils { * @return */ public static String serializeRecordToJson(RecordContext inputRecordContext, byte[] data) { - if (inputRecordContext == null) { - return null; - } + checkNotNull(inputRecordContext, "record-context can't be null"); JsonObject result = new JsonObject(); result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data)); if (inputRecordContext.getProperties() != null) { @@ -79,12 +194,6 @@ public class Utils { }); encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map); encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap); - Map<String, String> metadataMap = encryptionCtx.getMetadata(); - if (metadataMap != null && !metadataMap.isEmpty()) { - JsonObject metadata = new JsonObject(); - encryptionCtx.getMetadata().entrySet().forEach(m -> metadata.addProperty(m.getKey(), m.getValue())); - encryptionCtxJson.add(METADATA_FIELD, metadata); - } encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD, getEncoder().encodeToString(encryptionCtx.getParam())); encryptionCtxJson.addProperty(ALGO_FIELD, encryptionCtx.getAlgorithm()); diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java new file mode 100644 index 0000000..0a90a22 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java @@ -0,0 +1,15 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +public final class CompressionType { + private CompressionType() { } + public static final byte NONE = 0; + public static final byte LZ4 = 1; + public static final byte ZLIB = 2; + + public static final String[] names = { "NONE", "LZ4", "ZLIB", }; + + public static String name(int e) { return names[e]; } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java new file mode 100644 index 0000000..e6dff72 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java @@ -0,0 +1,68 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class EncryptionCtx extends Table { + public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb) { return getRootAsEncryptionCtx(_bb, new EncryptionCtx()); } + public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb, EncryptionCtx obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public EncryptionCtx __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public EncryptionKey keys(int j) { return keys(new EncryptionKey(), j); } + public EncryptionKey keys(EncryptionKey obj, int j) { int o = __offset(4); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int keysLength() { int o = __offset(4); return o != 0 ? __vector_len(o) : 0; } + public byte param(int j) { int o = __offset(6); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int paramLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer paramAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer paramInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + public String algo() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer algoAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } + public ByteBuffer algoInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } + public byte compressionType() { int o = __offset(10); return o != 0 ? bb.get(o + bb_pos) : 0; } + public int uncompressedMessageSize() { int o = __offset(12); return o != 0 ? bb.getInt(o + bb_pos) : 0; } + public int batchSize() { int o = __offset(14); return o != 0 ? bb.getInt(o + bb_pos) : 0; } + public boolean isBatchMessage() { int o = __offset(16); return o != 0 ? 0!=bb.get(o + bb_pos) : false; } + + public static int createEncryptionCtx(FlatBufferBuilder builder, + int keysOffset, + int paramOffset, + int algoOffset, + byte compressionType, + int uncompressedMessageSize, + int batchSize, + boolean isBatchMessage) { + builder.startObject(7); + EncryptionCtx.addBatchSize(builder, batchSize); + EncryptionCtx.addUncompressedMessageSize(builder, uncompressedMessageSize); + EncryptionCtx.addAlgo(builder, algoOffset); + EncryptionCtx.addParam(builder, paramOffset); + EncryptionCtx.addKeys(builder, keysOffset); + EncryptionCtx.addIsBatchMessage(builder, isBatchMessage); + EncryptionCtx.addCompressionType(builder, compressionType); + return EncryptionCtx.endEncryptionCtx(builder); + } + + public static void startEncryptionCtx(FlatBufferBuilder builder) { builder.startObject(7); } + public static void addKeys(FlatBufferBuilder builder, int keysOffset) { builder.addOffset(0, keysOffset, 0); } + public static int createKeysVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startKeysVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static void addParam(FlatBufferBuilder builder, int paramOffset) { builder.addOffset(1, paramOffset, 0); } + public static int createParamVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startParamVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } + public static void addAlgo(FlatBufferBuilder builder, int algoOffset) { builder.addOffset(2, algoOffset, 0); } + public static void addCompressionType(FlatBufferBuilder builder, byte compressionType) { builder.addByte(3, compressionType, 0); } + public static void addUncompressedMessageSize(FlatBufferBuilder builder, int uncompressedMessageSize) { builder.addInt(4, uncompressedMessageSize, 0); } + public static void addBatchSize(FlatBufferBuilder builder, int batchSize) { builder.addInt(5, batchSize, 0); } + public static void addIsBatchMessage(FlatBufferBuilder builder, boolean isBatchMessage) { builder.addBoolean(6, isBatchMessage, false); } + public static int endEncryptionCtx(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java new file mode 100644 index 0000000..44d74f4 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java @@ -0,0 +1,52 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class EncryptionKey extends Table { + public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb) { return getRootAsEncryptionKey(_bb, new EncryptionKey()); } + public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb, EncryptionKey obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public EncryptionKey __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public String key() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } + public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } + public byte value(int j) { int o = __offset(6); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int valueLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + public KeyValue metadata(int j) { return metadata(new KeyValue(), j); } + public KeyValue metadata(KeyValue obj, int j) { int o = __offset(8); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int metadataLength() { int o = __offset(8); return o != 0 ? __vector_len(o) : 0; } + + public static int createEncryptionKey(FlatBufferBuilder builder, + int keyOffset, + int valueOffset, + int metadataOffset) { + builder.startObject(3); + EncryptionKey.addMetadata(builder, metadataOffset); + EncryptionKey.addValue(builder, valueOffset); + EncryptionKey.addKey(builder, keyOffset); + return EncryptionKey.endEncryptionKey(builder); + } + + public static void startEncryptionKey(FlatBufferBuilder builder) { builder.startObject(3); } + public static void addKey(FlatBufferBuilder builder, int keyOffset) { builder.addOffset(0, keyOffset, 0); } + public static void addValue(FlatBufferBuilder builder, int valueOffset) { builder.addOffset(1, valueOffset, 0); } + public static int createValueVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startValueVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } + public static void addMetadata(FlatBufferBuilder builder, int metadataOffset) { builder.addOffset(2, metadataOffset, 0); } + public static int createMetadataVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startMetadataVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static int endEncryptionKey(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java new file mode 100644 index 0000000..8cb53b4 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java @@ -0,0 +1,41 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class KeyValue extends Table { + public static KeyValue getRootAsKeyValue(ByteBuffer _bb) { return getRootAsKeyValue(_bb, new KeyValue()); } + public static KeyValue getRootAsKeyValue(ByteBuffer _bb, KeyValue obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public KeyValue __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public String key() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } + public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } + public String value() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + + public static int createKeyValue(FlatBufferBuilder builder, + int keyOffset, + int valueOffset) { + builder.startObject(2); + KeyValue.addValue(builder, valueOffset); + KeyValue.addKey(builder, keyOffset); + return KeyValue.endKeyValue(builder); + } + + public static void startKeyValue(FlatBufferBuilder builder) { builder.startObject(2); } + public static void addKey(FlatBufferBuilder builder, int keyOffset) { builder.addOffset(0, keyOffset, 0); } + public static void addValue(FlatBufferBuilder builder, int valueOffset) { builder.addOffset(1, valueOffset, 0); } + public static int endKeyValue(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java new file mode 100644 index 0000000..d29171c --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java @@ -0,0 +1,53 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class Message extends Table { + public static Message getRootAsMessage(ByteBuffer _bb) { return getRootAsMessage(_bb, new Message()); } + public static Message getRootAsMessage(ByteBuffer _bb, Message obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public Message __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public EncryptionCtx encryptionCtx() { return encryptionCtx(new EncryptionCtx()); } + public EncryptionCtx encryptionCtx(EncryptionCtx obj) { int o = __offset(4); return o != 0 ? obj.__assign(__indirect(o + bb_pos), bb) : null; } + public KeyValue properties(int j) { return properties(new KeyValue(), j); } + public KeyValue properties(KeyValue obj, int j) { int o = __offset(6); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int propertiesLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public byte payload(int j) { int o = __offset(8); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int payloadLength() { int o = __offset(8); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer payloadAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } + public ByteBuffer payloadInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } + + public static int createMessage(FlatBufferBuilder builder, + int encryptionCtxOffset, + int propertiesOffset, + int payloadOffset) { + builder.startObject(3); + Message.addPayload(builder, payloadOffset); + Message.addProperties(builder, propertiesOffset); + Message.addEncryptionCtx(builder, encryptionCtxOffset); + return Message.endMessage(builder); + } + + public static void startMessage(FlatBufferBuilder builder) { builder.startObject(3); } + public static void addEncryptionCtx(FlatBufferBuilder builder, int encryptionCtxOffset) { builder.addOffset(0, encryptionCtxOffset, 0); } + public static void addProperties(FlatBufferBuilder builder, int propertiesOffset) { builder.addOffset(1, propertiesOffset, 0); } + public static int createPropertiesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startPropertiesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static void addPayload(FlatBufferBuilder builder, int payloadOffset) { builder.addOffset(2, payloadOffset, 0); } + public static int createPayloadVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startPayloadVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } + public static int endMessage(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } + public static void finishMessageBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); } + public static void finishSizePrefixedMessageBuffer(FlatBufferBuilder builder, int offset) { builder.finishSizePrefixed(offset); } +} + diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java index e3f1160..b9f5c8a 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java @@ -20,15 +20,20 @@ package org.apache.pulsar.io.kinesis; import static java.util.Base64.getDecoder; +import java.nio.ByteBuffer; import java.util.Map; import java.util.Optional; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType; import org.apache.pulsar.io.core.RecordContext; +import org.apache.pulsar.io.kinesis.fbs.KeyValue; +import org.apache.pulsar.io.kinesis.fbs.Message; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Maps; @@ -43,71 +48,166 @@ import lombok.ToString; */ public class UtilsTest { + @DataProvider(name = "encryption") + public Object[][] encryptionProvider() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + @Test public void testJsonSerialization() throws Exception { - final String key1 = "key1"; - final String key2 = "key2"; + final String[] keyNames = { "key1", "key2" }; final String key1Value = "test1"; final String key2Value = "test2"; + final byte[][] keyValues = { key1Value.getBytes(), key2Value.getBytes() }; final String param = "param"; final String algo = "algo"; + int batchSize = 10; + int compressionMsgSize = 10; - // prepare encryption-ctx - EncryptionContext ctx = new EncryptionContext(); - ctx.setAlgorithm(algo); - ctx.setBatchSize(Optional.of(10)); - ctx.setCompressionType(CompressionType.LZ4); - ctx.setUncompressedMessageSize(10); - Map<String, EncryptionKey> keys = Maps.newHashMap(); - EncryptionKey encKeyVal = new EncryptionKey(); - encKeyVal.setKeyValue(key1Value.getBytes()); + // serialize to json + byte[] data = "payload".getBytes(); + Map<String, String> properties = Maps.newHashMap(); + properties.put("prop1", "value"); Map<String, String> metadata1 = Maps.newHashMap(); metadata1.put("version", "v1"); metadata1.put("ckms", "cmks-1"); - encKeyVal.setMetadata(metadata1); - EncryptionKey encKeyVal2 = new EncryptionKey(); - encKeyVal2.setKeyValue(key2Value.getBytes()); Map<String, String> metadata2 = Maps.newHashMap(); metadata2.put("version", "v2"); metadata2.put("ckms", "cmks-2"); - encKeyVal2.setMetadata(metadata2); - keys.put(key1, encKeyVal); - keys.put(key2, encKeyVal2); - ctx.setKeys(keys); - ctx.setMetadata(metadata1); - ctx.setParam(param.getBytes()); - - // serialize to json - byte[] data = "payload".getBytes(); - Map<String, String> properties = Maps.newHashMap(); - properties.put("prop1", "value"); - RecordContext recordCtx = new RecordContextImpl(properties, ctx); + RecordContext recordCtx = createRecordContext(algo, keyNames, keyValues, param.getBytes(), metadata1, metadata2, + batchSize, compressionMsgSize, properties, true); String json = Utils.serializeRecordToJson(recordCtx, data); - System.out.println(json); // deserialize from json and assert KinesisMessageResponse kinesisJsonResponse = deSerializeRecordFromJson(json); Assert.assertEquals(data, getDecoder().decode(kinesisJsonResponse.getPayloadBase64())); EncryptionCtx encryptionCtxDeser = kinesisJsonResponse.getEncryptionCtx(); - Assert.assertEquals(key1Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1))); - Assert.assertEquals(key2Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2))); + Assert.assertEquals(key1Value.getBytes(), + getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[0]))); + Assert.assertEquals(key2Value.getBytes(), + getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[1]))); Assert.assertEquals(param.getBytes(), getDecoder().decode(encryptionCtxDeser.getEncParamBase64())); Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm()); - Assert.assertEquals(metadata1, encryptionCtxDeser.getKeysMetadataMap().get(key1)); - Assert.assertEquals(metadata2, encryptionCtxDeser.getKeysMetadataMap().get(key2)); - Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata()); + Assert.assertEquals(metadata1, encryptionCtxDeser.getKeysMetadataMap().get(keyNames[0])); + Assert.assertEquals(metadata2, encryptionCtxDeser.getKeysMetadataMap().get(keyNames[1])); Assert.assertEquals(properties, kinesisJsonResponse.getProperties()); } + @Test(dataProvider="encryption") + public void testFbSerialization(boolean isEncryption) throws Exception { + + final String[] keyNames = { "key1", "key2" }; + final String param = "param"; + final String algo = "algo"; + int batchSize = 10; + int compressionMsgSize = 10; + + for (int k = 0; k < 5; k++) { + String payloadString = RandomStringUtils.random(142342 * k, String.valueOf(System.currentTimeMillis())); + final String key1Value = payloadString + "test1"; + final String key2Value = payloadString + "test2"; + final byte[][] keyValues = { key1Value.getBytes(), key2Value.getBytes() }; + byte[] data = payloadString.getBytes(); + Map<String, String> properties = Maps.newHashMap(); + properties.put("prop1", payloadString); + Map<String, String> metadata1 = Maps.newHashMap(); + metadata1.put("version", "v1"); + metadata1.put("ckms", "cmks-1"); + Map<String, String> metadata2 = Maps.newHashMap(); + metadata2.put("version", "v2"); + metadata2.put("ckms", "cmks-2"); + RecordContext recordCtx = createRecordContext(algo, keyNames, keyValues, param.getBytes(), metadata1, + metadata2, batchSize, compressionMsgSize, properties, isEncryption); + ByteBuffer flatBuffer = Utils.serializeRecordToFlatBuffer(recordCtx, data); + + Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer); + byte[] fbPayloadBytes = new byte[kinesisJsonResponse.payloadLength()]; + kinesisJsonResponse.payloadAsByteBuffer().get(fbPayloadBytes); + Assert.assertEquals(data, fbPayloadBytes); + + if(isEncryption) { + org.apache.pulsar.io.kinesis.fbs.EncryptionCtx encryptionCtxDeser = kinesisJsonResponse.encryptionCtx(); + byte compressionType = encryptionCtxDeser.compressionType(); + int fbBatchSize = encryptionCtxDeser.batchSize(); + boolean isBathcMessage = encryptionCtxDeser.isBatchMessage(); + int fbCompressionMsgSize = encryptionCtxDeser.uncompressedMessageSize(); + int totalKeys = encryptionCtxDeser.keysLength(); + Map<String, Map<String, String>> fbKeyMetadataResult = Maps.newHashMap(); + Map<String, byte[]> fbKeyValueResult = Maps.newHashMap(); + for (int i = 0; i < encryptionCtxDeser.keysLength(); i++) { + org.apache.pulsar.io.kinesis.fbs.EncryptionKey encryptionKey = encryptionCtxDeser.keys(i); + String keyName = encryptionKey.key(); + byte[] keyValueBytes = new byte[encryptionKey.valueLength()]; + encryptionKey.valueAsByteBuffer().get(keyValueBytes); + fbKeyValueResult.put(keyName, keyValueBytes); + Map<String, String> fbMetadata = Maps.newHashMap(); + for (int j = 0; j < encryptionKey.metadataLength(); j++) { + KeyValue encMtdata = encryptionKey.metadata(j); + fbMetadata.put(encMtdata.key(), encMtdata.value()); + } + fbKeyMetadataResult.put(keyName, fbMetadata); + } + byte[] paramBytes = new byte[encryptionCtxDeser.paramLength()]; + encryptionCtxDeser.paramAsByteBuffer().get(paramBytes); + + Assert.assertEquals(totalKeys, 2); + Assert.assertEquals(batchSize, fbBatchSize); + Assert.assertEquals(isBathcMessage, true); + Assert.assertEquals(compressionMsgSize, fbCompressionMsgSize); + Assert.assertEquals(keyValues[0], fbKeyValueResult.get(keyNames[0])); + Assert.assertEquals(keyValues[1], fbKeyValueResult.get(keyNames[1])); + Assert.assertEquals(metadata1, fbKeyMetadataResult.get(keyNames[0])); + Assert.assertEquals(metadata2, fbKeyMetadataResult.get(keyNames[1])); + Assert.assertEquals(compressionType, org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4); + Assert.assertEquals(param.getBytes(), paramBytes); + Assert.assertEquals(algo, encryptionCtxDeser.algo()); + } + + Map<String, String> fbproperties = Maps.newHashMap(); + for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) { + KeyValue property = kinesisJsonResponse.properties(i); + fbproperties.put(property.key(), property.value()); + } + Assert.assertEquals(properties, fbproperties); + + } + } + + private RecordContext createRecordContext(String algo, String[] keyNames, byte[][] keyValues, byte[] param, + Map<String, String> metadata1, Map<String, String> metadata2, int batchSize, int compressionMsgSize, + Map<String, String> properties, boolean isEncryption) { + EncryptionContext ctx = null; + if(isEncryption) { + ctx = new EncryptionContext(); + ctx.setAlgorithm(algo); + ctx.setBatchSize(Optional.of(batchSize)); + ctx.setCompressionType(CompressionType.LZ4); + ctx.setUncompressedMessageSize(compressionMsgSize); + Map<String, EncryptionKey> keys = Maps.newHashMap(); + EncryptionKey encKeyVal = new EncryptionKey(); + encKeyVal.setKeyValue(keyValues[0]); + + encKeyVal.setMetadata(metadata1); + EncryptionKey encKeyVal2 = new EncryptionKey(); + encKeyVal2.setKeyValue(keyValues[1]); + encKeyVal2.setMetadata(metadata2); + keys.put(keyNames[0], encKeyVal); + keys.put(keyNames[1], encKeyVal2); + ctx.setKeys(keys); + ctx.setParam(param); + } + return new RecordContextImpl(properties, Optional.ofNullable(ctx)); + } + class RecordContextImpl implements RecordContext { Map<String, String> properties; Optional<EncryptionContext> ectx; - public RecordContextImpl(Map<String, String> properties, EncryptionContext ectx) { + public RecordContextImpl(Map<String, String> properties, Optional<EncryptionContext> ectx) { this.properties = properties; - this.ectx = Optional.of(ectx); + this.ectx = ectx; } public Map<String, String> getProperties() { @@ -146,8 +246,6 @@ public class UtilsTest { private Map<String, String> keysMapBase64; // map of encryption-key metadata private Map<String, Map<String, String>> keysMetadataMap; - // encryption-ctx metadata - private Map<String, String> metadata; // encryption param which is base64 encoded private String encParamBase64; // encryption algorithm