This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 56ea8a887178167f7a437d62a26489fb6ea8e436 Author: Sijie Guo <si...@apache.org> AuthorDate: Thu Jul 16 05:53:30 2020 -0700 [PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540) *Motivation* The code generation for `repeated long` is not handled properly. (I am not sure how changes were made to PulsarApi.proto) *Modification* This pull request adds the code to handle generating code for `repeated long`. *Test* Add unit test to ensure `repeated long` is processed. Add test cases to cover both packed and non-package serialization for `repeated long`. See more details about packed serialization: https://developers.google.com/protocol-buffers/docs/encoding#optional (cherry picked from commit 4e358ef9f3fa8c6164286d6e80f6e75f66c31eab) --- pom.xml | 2 + .../apache/pulsar/common/api/proto/PulsarApi.java | 18 + .../util/protobuf/ByteBufCodedInputStream.java | 30 + .../apache/pulsar/common/api/proto/TestApi.java | 641 +++++++++++++++++++++ .../common/protocol/RepeatedLongNonPackedTest.java | 65 +++ .../common/protocol/RepeatedLongPackedTest.java | 65 +++ pulsar-common/src/test/proto/TestApi.proto | 32 + 7 files changed, 853 insertions(+) diff --git a/pom.xml b/pom.xml index 822dd65..246edd6 100644 --- a/pom.xml +++ b/pom.xml @@ -1272,6 +1272,7 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude> + <exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude> @@ -1347,6 +1348,7 @@ flexible messaging model and an intuitive client API.</description> and are included in source tree for convenience --> <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> + <exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>bin/proto/MLDataFormats_pb2.py</exclude> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 4d1babf..2f822e8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -1578,6 +1578,15 @@ public final class PulsarApi { ackSet_.add(input.readInt64()); break; } + case 42: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + while (input.getBytesUntilLimit() > 0) { + addAckSet(input.readInt64()); + } + input.popLimit(limit); + break; + } } } } @@ -18860,6 +18869,15 @@ public final class PulsarApi { ackSet_.add(input.readInt64()); break; } + case 34: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + while (input.getBytesUntilLimit() > 0) { + addAckSet(input.readInt64()); + } + input.popLimit(limit); + break; + } } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java index e4dad0c..caac6d6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java @@ -346,4 +346,34 @@ public class ByteBufCodedInputStream { buf.readerIndex(buf.readerIndex() + size); } + + public int pushLimit(int byteLimit) throws InvalidProtocolBufferException { + if (byteLimit < 0) { + throw new InvalidProtocolBufferException("CodedInputStream encountered an embedded string or message" + + " which claimed to have negative size."); + } + + byteLimit += buf.readerIndex(); + final int oldLimit = buf.writerIndex(); + if (byteLimit > oldLimit) { + throw new InvalidProtocolBufferException("While parsing a protocol message, the input ended unexpectedly" + + " in the middle of a field. This could mean either than the input has been truncated or that an" + + " embedded message misreported its own length."); + } + buf.writerIndex(byteLimit); + return oldLimit; + } + + /** + * Discards the current limit, returning to the previous limit. + * + * @param oldLimit The old limit, as returned by {@code pushLimit}. + */ + public void popLimit(final int oldLimit) { + buf.writerIndex(oldLimit); + } + + public int getBytesUntilLimit() { + return buf.readableBytes(); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java new file mode 100644 index 0000000..f165a94 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/proto/TestApi.java @@ -0,0 +1,641 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: src/test/proto/TestApi.proto + +package org.apache.pulsar.common.api.proto; + +public final class TestApi { + private TestApi() {} + public static void registerAllExtensions( + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite registry) { + } + public interface MessageIdDataOrBuilder + extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder { + + // required uint64 ledgerId = 1; + boolean hasLedgerId(); + long getLedgerId(); + + // required uint64 entryId = 2; + boolean hasEntryId(); + long getEntryId(); + + // optional int32 partition = 3 [default = -1]; + boolean hasPartition(); + int getPartition(); + + // optional int32 batch_index = 4 [default = -1]; + boolean hasBatchIndex(); + int getBatchIndex(); + + // repeated int64 ack_set = 5 [packed = true]; + java.util.List<java.lang.Long> getAckSetList(); + int getAckSetCount(); + long getAckSet(int index); + } + public static final class MessageIdData extends + org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite + implements MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use MessageIdData.newBuilder() to construct. + private io.netty.util.Recycler.Handle handle; + private MessageIdData(io.netty.util.Recycler.Handle handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler<MessageIdData> RECYCLER = new io.netty.util.Recycler<MessageIdData>() { + protected MessageIdData newObject(Handle handle) { + return new MessageIdData(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + if (handle != null) { RECYCLER.recycle(this, handle); } + } + + private MessageIdData(boolean noInit) {} + + private static final MessageIdData defaultInstance; + public static MessageIdData getDefaultInstance() { + return defaultInstance; + } + + public MessageIdData getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required uint64 ledgerId = 1; + public static final int LEDGERID_FIELD_NUMBER = 1; + private long ledgerId_; + public boolean hasLedgerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getLedgerId() { + return ledgerId_; + } + + // required uint64 entryId = 2; + public static final int ENTRYID_FIELD_NUMBER = 2; + private long entryId_; + public boolean hasEntryId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getEntryId() { + return entryId_; + } + + // optional int32 partition = 3 [default = -1]; + public static final int PARTITION_FIELD_NUMBER = 3; + private int partition_; + public boolean hasPartition() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getPartition() { + return partition_; + } + + // optional int32 batch_index = 4 [default = -1]; + public static final int BATCH_INDEX_FIELD_NUMBER = 4; + private int batchIndex_; + public boolean hasBatchIndex() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public int getBatchIndex() { + return batchIndex_; + } + + // repeated int64 ack_set = 5 [packed = true]; + public static final int ACK_SET_FIELD_NUMBER = 5; + private java.util.List<java.lang.Long> ackSet_; + public java.util.List<java.lang.Long> + getAckSetList() { + return ackSet_; + } + public int getAckSetCount() { + return ackSet_.size(); + } + public long getAckSet(int index) { + return ackSet_.get(index); + } + private int ackSetMemoizedSerializedSize = -1; + + private void initFields() { + ledgerId_ = 0L; + entryId_ = 0L; + partition_ = -1; + batchIndex_ = -1; + ackSet_ = java.util.Collections.emptyList();; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasLedgerId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasEntryId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt64(1, ledgerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, entryId_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt32(3, partition_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeInt32(4, batchIndex_); + } + if (getAckSetList().size() > 0) { + output.writeRawVarint32(42); + output.writeRawVarint32(ackSetMemoizedSerializedSize); + } + for (int i = 0; i < ackSet_.size(); i++) { + output.writeInt64NoTag(ackSet_.get(i)); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(1, ledgerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(2, entryId_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt32Size(3, partition_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt32Size(4, batchIndex_); + } + { + int dataSize = 0; + for (int i = 0; i < ackSet_.size(); i++) { + dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt64SizeNoTag(ackSet_.get(i)); + } + size += dataSize; + if (!getAckSetList().isEmpty()) { + size += 1; + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt32SizeNoTag(dataSize); + } + ackSetMemoizedSerializedSize = dataSize; + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(byte[] data) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom( + byte[] data, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom( + java.io.InputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseDelimitedFrom( + java.io.InputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.TestApi.MessageIdData parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.TestApi.MessageIdData prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.TestApi.MessageIdData, Builder> + implements org.apache.pulsar.common.api.proto.TestApi.MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.TestApi.MessageIdData.newBuilder() + private final io.netty.util.Recycler.Handle handle; + private Builder(io.netty.util.Recycler.Handle handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() { + protected Builder newObject(io.netty.util.Recycler.Handle handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + if (handle != null) {RECYCLER.recycle(this, handle);} + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + ledgerId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + entryId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); + partition_ = -1; + bitField0_ = (bitField0_ & ~0x00000004); + batchIndex_ = -1; + bitField0_ = (bitField0_ & ~0x00000008); + ackSet_ = java.util.Collections.emptyList();; + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.TestApi.MessageIdData getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.TestApi.MessageIdData build() { + org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildParsed() + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.TestApi.MessageIdData buildPartial() { + org.apache.pulsar.common.api.proto.TestApi.MessageIdData result = org.apache.pulsar.common.api.proto.TestApi.MessageIdData.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.ledgerId_ = ledgerId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.entryId_ = entryId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.partition_ = partition_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.batchIndex_ = batchIndex_; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + ackSet_ = java.util.Collections.unmodifiableList(ackSet_); + bitField0_ = (bitField0_ & ~0x00000010); + } + result.ackSet_ = ackSet_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.TestApi.MessageIdData other) { + if (other == org.apache.pulsar.common.api.proto.TestApi.MessageIdData.getDefaultInstance()) return this; + if (other.hasLedgerId()) { + setLedgerId(other.getLedgerId()); + } + if (other.hasEntryId()) { + setEntryId(other.getEntryId()); + } + if (other.hasPartition()) { + setPartition(other.getPartition()); + } + if (other.hasBatchIndex()) { + setBatchIndex(other.getBatchIndex()); + } + if (!other.ackSet_.isEmpty()) { + if (ackSet_.isEmpty()) { + ackSet_ = other.ackSet_; + bitField0_ = (bitField0_ & ~0x00000010); + } else { + ensureAckSetIsMutable(); + ackSet_.addAll(other.ackSet_); + } + + } + return this; + } + + public final boolean isInitialized() { + if (!hasLedgerId()) { + + return false; + } + if (!hasEntryId()) { + + return false; + } + return true; + } + + public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + ledgerId_ = input.readUInt64(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + entryId_ = input.readUInt64(); + break; + } + case 24: { + bitField0_ |= 0x00000004; + partition_ = input.readInt32(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + batchIndex_ = input.readInt32(); + break; + } + case 40: { + ensureAckSetIsMutable(); + ackSet_.add(input.readInt64()); + break; + } + case 42: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + while (input.getBytesUntilLimit() > 0) { + addAckSet(input.readInt64()); + } + input.popLimit(limit); + break; + } + } + } + } + + private int bitField0_; + + // required uint64 ledgerId = 1; + private long ledgerId_ ; + public boolean hasLedgerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getLedgerId() { + return ledgerId_; + } + public Builder setLedgerId(long value) { + bitField0_ |= 0x00000001; + ledgerId_ = value; + + return this; + } + public Builder clearLedgerId() { + bitField0_ = (bitField0_ & ~0x00000001); + ledgerId_ = 0L; + + return this; + } + + // required uint64 entryId = 2; + private long entryId_ ; + public boolean hasEntryId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getEntryId() { + return entryId_; + } + public Builder setEntryId(long value) { + bitField0_ |= 0x00000002; + entryId_ = value; + + return this; + } + public Builder clearEntryId() { + bitField0_ = (bitField0_ & ~0x00000002); + entryId_ = 0L; + + return this; + } + + // optional int32 partition = 3 [default = -1]; + private int partition_ = -1; + public boolean hasPartition() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getPartition() { + return partition_; + } + public Builder setPartition(int value) { + bitField0_ |= 0x00000004; + partition_ = value; + + return this; + } + public Builder clearPartition() { + bitField0_ = (bitField0_ & ~0x00000004); + partition_ = -1; + + return this; + } + + // optional int32 batch_index = 4 [default = -1]; + private int batchIndex_ = -1; + public boolean hasBatchIndex() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public int getBatchIndex() { + return batchIndex_; + } + public Builder setBatchIndex(int value) { + bitField0_ |= 0x00000008; + batchIndex_ = value; + + return this; + } + public Builder clearBatchIndex() { + bitField0_ = (bitField0_ & ~0x00000008); + batchIndex_ = -1; + + return this; + } + + // repeated int64 ack_set = 5 [packed = true]; + private java.util.List<java.lang.Long> ackSet_ = java.util.Collections.emptyList();; + private void ensureAckSetIsMutable() { + if (!((bitField0_ & 0x00000010) == 0x00000010)) { + ackSet_ = new java.util.ArrayList<java.lang.Long>(ackSet_); + bitField0_ |= 0x00000010; + } + } + public java.util.List<java.lang.Long> + getAckSetList() { + return java.util.Collections.unmodifiableList(ackSet_); + } + public int getAckSetCount() { + return ackSet_.size(); + } + public long getAckSet(int index) { + return ackSet_.get(index); + } + public Builder setAckSet( + int index, long value) { + ensureAckSetIsMutable(); + ackSet_.set(index, value); + + return this; + } + public Builder addAckSet(long value) { + ensureAckSetIsMutable(); + ackSet_.add(value); + + return this; + } + public Builder addAllAckSet( + java.lang.Iterable<? extends java.lang.Long> values) { + ensureAckSetIsMutable(); + super.addAll(values, ackSet_); + + return this; + } + public Builder clearAckSet() { + ackSet_ = java.util.Collections.emptyList();; + bitField0_ = (bitField0_ & ~0x00000010); + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageIdData) + } + + static { + defaultInstance = new MessageIdData(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.MessageIdData) + } + + + static { + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java new file mode 100644 index 0000000..7c50d13 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongNonPackedTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol; + +import static org.testng.Assert.assertEquals; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; +import org.testng.annotations.Test; + +public class RepeatedLongNonPackedTest { + + @Test + public void testRepeatedLongPacked() throws Exception { + MessageIdData messageIdData = MessageIdData.newBuilder() + .setLedgerId(0L) + .setEntryId(0L) + .setPartition(0) + .setBatchIndex(0) + .addAckSet(1000) + .addAckSet(1001) + .addAckSet(1003) + .build(); + + int cmdSize = messageIdData.getSerializedSize(); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize); + ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf); + messageIdData.writeTo(outputStream); + + messageIdData.recycle(); + outputStream.recycle(); + + ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf); + MessageIdData newMessageIdData = MessageIdData.newBuilder() + .mergeFrom(inputStream, null) + .build(); + inputStream.recycle(); + + assertEquals(3, newMessageIdData.getAckSetCount()); + assertEquals(1000, newMessageIdData.getAckSet(0)); + assertEquals(1001, newMessageIdData.getAckSet(1)); + assertEquals(1003, newMessageIdData.getAckSet(2)); + newMessageIdData.recycle(); + } + +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java new file mode 100644 index 0000000..e0569a8 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/RepeatedLongPackedTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol; + +import static org.testng.Assert.assertEquals; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.TestApi.MessageIdData; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; +import org.testng.annotations.Test; + +public class RepeatedLongPackedTest { + + @Test + public void testRepeatedLongPacked() throws Exception { + MessageIdData messageIdData = MessageIdData.newBuilder() + .setLedgerId(0L) + .setEntryId(0L) + .setPartition(0) + .setBatchIndex(0) + .addAckSet(1000) + .addAckSet(1001) + .addAckSet(1003) + .build(); + + int cmdSize = messageIdData.getSerializedSize(); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize); + ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf); + messageIdData.writeTo(outputStream); + + messageIdData.recycle(); + outputStream.recycle(); + + ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf); + MessageIdData newMessageIdData = MessageIdData.newBuilder() + .mergeFrom(inputStream, null) + .build(); + inputStream.recycle(); + + assertEquals(3, newMessageIdData.getAckSetCount()); + assertEquals(1000, newMessageIdData.getAckSet(0)); + assertEquals(1001, newMessageIdData.getAckSet(1)); + assertEquals(1003, newMessageIdData.getAckSet(2)); + newMessageIdData.recycle(); + } + +} diff --git a/pulsar-common/src/test/proto/TestApi.proto b/pulsar-common/src/test/proto/TestApi.proto new file mode 100644 index 0000000..24c90e4 --- /dev/null +++ b/pulsar-common/src/test/proto/TestApi.proto @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; + +package pulsar.proto; +option java_package = "org.apache.pulsar.common.api.proto"; +option optimize_for = LITE_RUNTIME; + +message MessageIdData { + required uint64 ledgerId = 1; + required uint64 entryId = 2; + optional int32 partition = 3 [default = -1]; + optional int32 batch_index = 4 [default = -1]; + repeated int64 ack_set = 5 [packed = true]; +} +