This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 32e3bb87cc6863274e46a6897bbbb968d0dc1fde Author: Enrico Olivelli <eolive...@apache.org> AuthorDate: Wed May 12 18:16:24 2021 +0200 Fix some conflicts --- .../pulsar/client/api/SimpleProducerConsumerTest.java | 1 + .../java/org/apache/pulsar/client/api/SimpleSchemaTest.java | 1 + .../src/test/java/org/apache/pulsar/schema/SchemaTest.java | 5 +---- .../org/apache/pulsar/client/api/schema/GenericRecord.java | 1 + .../java/org/apache/pulsar/client/impl/MessageImpl.java | 11 ++--------- .../apache/pulsar/client/impl/schema/AutoConsumeSchema.java | 13 ------------- .../client/impl/schema/generic/GenericAvroRecord.java | 1 + .../client/impl/schema/generic/GenericJsonRecord.java | 2 ++ .../impl/schema/generic/GenericProtobufNativeRecord.java | 1 + .../apache/pulsar/client/impl/schema/JSONSchemaTest.java | 7 ++++--- .../client/impl/schema/generic/GenericJsonRecordTest.java | 13 ++++++++++--- .../schema/generic/GenericProtobufNativeReaderTest.java | 3 ++- .../apache/pulsar/functions/source/PulsarSourceTest.java | 2 +- 13 files changed, 27 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index e522908..4613cb3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -100,6 +100,7 @@ import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.apache.pulsar.common.schema.SchemaType; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 0e1d756..d594688 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.api; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.Cleanup; import lombok.NoArgsConstructor; import static java.nio.charset.StandardCharsets.UTF_8; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 9542a36..0897e67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -23,6 +23,7 @@ import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTes import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; @@ -155,10 +156,6 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producer.send(personTwo); - Schemas.PersonTwo personConsume = consumer.receive().getValue(); - assertEquals("Tom", personConsume.getName()); - assertEquals(1, personConsume.getId()); - Message<Schemas.PersonTwo> message = consumer.receive(); Schemas.PersonTwo personConsume = message.getValue(); assertEquals(personConsume.getName(), "Tom"); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java index 1dda99f..2f1edb1 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.api.schema; import java.util.List; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.common.schema.SchemaType; /** * An interface represents a message with schema. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 1eb7d3f..d7e262b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -268,14 +268,6 @@ public class MessageImpl<T> implements Message<T> { } } - @Override - public int size() { - if (msgMetadata.isNullValue()) { - return 0; - } - return payload.readableBytes(); - } - public Schema<T> getSchemaInternal() { return this.schema; } @@ -470,7 +462,8 @@ public class MessageImpl<T> implements Message<T> { @Override public byte[] getKeyBytes() { - if (!msgMetadata.hasPartitionKey() || msgMetadata.isNullPartitionKey()) { + checkNotNull(msgMetadataBuilder); + if (!msgMetadataBuilder.hasPartitionKey() || msgMetadataBuilder.getPartitionKey() == null) { return null; } else if (hasBase64EncodedKey()) { return Base64.getDecoder().decode(getKey()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 160a3a7..9f66875 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -217,19 +217,6 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { } } - public Schema<GenericRecord> clone() { - Schema<GenericRecord> schema = new AutoConsumeSchema(); - if (this.schema != null) { - schema.configureSchemaInfo(topicName, componentName, this.schema.getSchemaInfo()); - } else { - schema.configureSchemaInfo(topicName, componentName, null); - } - if (schemaInfoProvider != null) { - schema.setSchemaInfoProvider(schemaInfoProvider); - } - return schema; - } - @Override public boolean requireFetchingSchemaInfo() { return true; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java index 767dcdb..c48c585 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.avro.util.Utf8; import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.common.schema.SchemaType; /** * A generic avro record. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java index eaefaa5..4557022 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java @@ -28,6 +28,8 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + /** * Generic json record. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java index 1f94bb1..b061bba 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema.generic; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.common.schema.SchemaType; import java.util.List; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java index d6bd455..ad3da62 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java @@ -26,10 +26,11 @@ import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_N import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertSame; - -import com.fasterxml.jackson.core.JsonProcessingException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.SchemaValidationException; @@ -372,7 +373,7 @@ public class JSONSchemaTest { } @Test - public void testEncodeAndDecodeObject() throws JsonProcessingException { + public void testEncodeAndDecodeObject() throws Exception { JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build()); PC pc = new PC("dell", "alienware", 2021, GPU.AMD, new Seller("WA", "street", 98004)); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java index 4a9bdae..453ecaf 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java @@ -21,15 +21,22 @@ package org.apache.pulsar.client.impl.schema.generic; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.testng.annotations.Test; - +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import java.util.Collections; import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.JSONSchema; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - +import static org.testng.AssertJUnit.assertSame; public class GenericJsonRecordTest { @@ -93,7 +100,7 @@ public class GenericJsonRecordTest { } @Test - public void testEncodeAndDecodeObject() throws JsonProcessingException { + public void testEncodeAndDecodeObject() throws Exception { // test case from issue https://github.com/apache/pulsar/issues/9605 JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build()); GenericSchema genericJsonSchema = GenericJsonSchema.of(jsonSchema.getSchemaInfo()); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java index 3a57688..84dc7fd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.schema.generic; +import com.google.protobuf.DynamicMessage; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; @@ -25,7 +26,7 @@ import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; import org.apache.pulsar.client.schema.proto.Test.TestMessage; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - +import org.apache.pulsar.common.schema.SchemaType; import static org.testng.Assert.assertEquals; @Slf4j diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 65474ff..8e4ad26 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -286,7 +286,7 @@ public class PulsarSourceTest { MessageImpl messageImpl = mock(MessageImpl.class); Schema schema = mock(Schema.class); when(messageImpl.getSchemaInternal()).thenReturn(schema); - pulsarSource.received(consumer, (Message) messageImpl); + pulsarSource.received(consumer, messageImpl); verify(messageImpl.getSchemaInternal(), times(1)); Record<GenericRecord> pushed = pulsarSource.read(); assertSame(pushed.getSchema(), schema);