This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ff5924492d784fb69d795d694e3e58183dd989c9 Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Mon Apr 19 14:18:29 2021 +0200 Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload (#10248) (cherry picked from commit 54523bbff9b2b2b861409a93cfb3712bfdb73f27) --- .../org/apache/pulsar/client/impl/MessageImpl.java | 11 +- .../client/impl/schema/AutoConsumeSchema.java | 72 ++++++++---- .../integration/io/TestGenericObjectSink.java | 4 +- .../io/PulsarGenericObjectSinkTest.java | 123 ++++++++++----------- .../suites/PulsarStandaloneTestSuite.java | 1 + .../topologies/PulsarStandaloneTestBase.java | 15 +++ 6 files changed, 137 insertions(+), 89 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 3cb2235..a148677 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; public class MessageImpl<T> implements Message<T> { @@ -279,10 +280,18 @@ public class MessageImpl<T> implements Message<T> { } } + private SchemaInfo getSchemaInfo() { + if (schema instanceof AutoConsumeSchema) { + ((AutoConsumeSchema) schema).fetchSchemaIfNeeded(); + } + return schema.getSchemaInfo(); + } + @Override public T getValue() { checkNotNull(msgMetadataBuilder); - if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { + SchemaInfo schemaInfo = getSchemaInfo(); + if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) { if (schema.supportSchemaVersioning()) { return getKeyValueBySchemaVersion(); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 41b1260..07c807c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl.schema; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; -import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; @@ -30,7 +29,11 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static com.google.common.base.Preconditions.checkState; @@ -77,27 +80,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { @Override public GenericRecord decode(byte[] bytes, byte[] schemaVersion) { - if (schema == null) { - SchemaInfo schemaInfo = null; - try { - schemaInfo = schemaInfoProvider.getLatestSchema().get(); - if (schemaInfo == null) { - // schemaless topic - schemaInfo = BytesSchema.of().getSchemaInfo(); - } - } catch (InterruptedException | ExecutionException e ) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - log.error("Can't get last schema for topic {} use AutoConsumeSchema", topicName); - throw new SchemaSerializationException(e.getCause()); - } - // schemaInfo null means that there is no schema attached to the topic. - schema = generateSchema(schemaInfo); - schema.setSchemaInfoProvider(schemaInfoProvider); - log.info("Configure {} schema for topic {} : {}", - componentName, topicName, schemaInfo.getSchemaDefinition()); - } + fetchSchemaIfNeeded(); ensureSchemaInitialized(); return adapt(schema.decode(bytes, schemaVersion), schemaVersion); } @@ -157,7 +140,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { } } - private Schema<?> generateSchema(SchemaInfo schemaInfo) { + private static Schema<?> generateSchema(SchemaInfo schemaInfo) { // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader // to decode the messages. final boolean useProvidedSchemaAsReaderSchema = false; @@ -260,4 +243,47 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { public Schema<?> getInternalSchema() { return schema; } + + /** + * It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo() + * We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many + * places and we will introduce lots of deadlocks. + */ + public void fetchSchemaIfNeeded() throws SchemaSerializationException { + if (schema == null) { + if (schemaInfoProvider == null) { + throw new SchemaSerializationException("Can't get accurate schema information for topic " + topicName + + "using AutoConsumeSchema because SchemaInfoProvider is not set yet"); + } else { + SchemaInfo schemaInfo = null; + try { + schemaInfo = schemaInfoProvider.getLatestSchema().get(); + if (schemaInfo == null) { + // schemaless topic + schemaInfo = BytesSchema.of().getSchemaInfo(); + } + } catch (InterruptedException | ExecutionException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + log.error("Can't get last schema for topic {} using AutoConsumeSchema", topicName); + throw new SchemaSerializationException(e.getCause()); + } + // schemaInfo null means that there is no schema attached to the topic. + schema = generateSchema(schemaInfo); + schema.setSchemaInfoProvider(schemaInfoProvider); + log.info("Configure {} schema for topic {} : {}", + componentName, topicName, schemaInfo.getSchemaDefinition()); + } + } + } + + @Override + public String toString() { + if (schema != null && schema.getSchemaInfo() != null) { + return "AUTO_CONSUME(schematype=" + schema.getSchemaInfo().getType() + ")"; + } else { + return "AUTO_CONSUME(uninitialized)"; + } + } } diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java index c0c4ac2..b7645ba 100644 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java @@ -36,7 +36,7 @@ public class TestGenericObjectSink implements Sink<GenericObject> { } public void write(Record<GenericObject> record) { - + log.info("topic {}", record.getTopicName().orElse(null)); log.info("properties {}", record.getProperties()); log.info("received record {} {}", record, record.getClass()); log.info("schema {}", record.getSchema()); @@ -65,6 +65,8 @@ public class TestGenericObjectSink implements Sink<GenericObject> { log.info("value {}", record.getValue()); log.info("value schema type {}", record.getValue().getSchemaType()); log.info("value native object {}", record.getValue().getNativeObject()); + + record.ack(); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java index 1e5fb74..c810b1b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.io; import lombok.Builder; import lombok.Cleanup; import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; @@ -43,6 +45,8 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; import static org.testng.Assert.assertEquals; @@ -55,15 +59,14 @@ import static org.testng.Assert.fail; @Slf4j public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite { + @Getter private static final class SinkSpec<T> { final String outputTopicName; - final String sinkName; final Schema<T> schema; final T testValue; - public SinkSpec(String outputTopicName, String sinkName, Schema<T> schema, T testValue) { + public SinkSpec(String outputTopicName, Schema<T> schema, T testValue) { this.outputTopicName = outputTopicName; - this.sinkName = sinkName; this.schema = schema; this.testValue = testValue; } @@ -89,90 +92,82 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite { .serviceUrl(container.getPlainTextServiceUrl()) .build(); + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build(); + // we are not using a parametrized test in order to save resources - // we create N sinks, send the records and verify each sink - // sinks execution happens in parallel + // we create one sink that listens on multiple topics, send the records and verify the sink List<SinkSpec> specs = Arrays.asList( - new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"), - new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()), - new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8), + new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"), + new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()), + new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)), - new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8), - Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())) + new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), + Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())), + new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), + Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())) ); - // submit all sinks - for (SinkSpec spec : specs) { - submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR); - } - // check all sinks - for (SinkSpec spec : specs) { - // get sink info - getSinkInfoSuccess(spec.sinkName); - getSinkStatus(spec.sinkName); - } - final int numRecords = 10; + final int numRecordsPerTopic = 2; + + String sinkName = "genericobject-sink"; + String topicNames = specs + .stream() + .map(SinkSpec::getOutputTopicName) + .collect(Collectors.joining(",")); + submitSinkConnector(sinkName, topicNames, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR); + // get sink info + getSinkInfoSuccess(sinkName); + getSinkStatus(sinkName); + for (SinkSpec spec : specs) { + @Cleanup Producer<Object> producer = client.newProducer(spec.schema) .topic(spec.outputTopicName) .create(); - for (int i = 0; i < numRecords; i++) { + for (int i = 0; i < numRecordsPerTopic; i++) { MessageId messageId = producer.newMessage() .value(spec.testValue) .property("expectedType", spec.schema.getSchemaInfo().getType().toString()) + .property("recordNumber", i + "") .send(); log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId); } } - // wait that all sinks processed all records without errors - try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { - for (SinkSpec spec : specs) { - try { - log.info("waiting for sink {}", spec.sinkName); - for (int i = 0; i < 120; i++) { - SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName); - log.info("sink {} status {}", spec.sinkName, status); - assertEquals(status.getInstances().size(), 1); - SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0); - if (instance.getStatus().numWrittenToSink >= numRecords) { - break; - } - assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred"); - Thread.sleep(1000); - } - - SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName); - log.info("sink {} status {}", spec.sinkName, status); - assertEquals(status.getInstances().size(), 1); - assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords); - assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0); - assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0); - log.info("sink {} is okay", spec.sinkName); - } finally { - dumpSinkLogs(spec); + // wait that sink processed all records without errors + + try { + log.info("waiting for sink {}", sinkName); + + for (int i = 0; i < 120; i++) { + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + log.info("sink {} status {}", sinkName, status); + assertEquals(status.getInstances().size(), 1); + SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0); + if (instance.getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size() + || instance.getStatus().numSinkExceptions > 0 + || instance.getStatus().numSystemExceptions > 0 + || instance.getStatus().numRestarts > 0) { + break; } + Thread.sleep(1000); } - } - - for (SinkSpec spec : specs) { - deleteSink(spec.sinkName); - getSinkInfoNotFound(spec.sinkName); + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + log.info("sink {} status {}", sinkName, status); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size()); + assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0); + assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0); + log.info("sink {} is okay", sinkName); + } finally { + dumpFunctionLogs(sinkName); } - } - private void dumpSinkLogs(SinkSpec spec) { - try { - String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log"; - String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> { - return IOUtils.toString(inputStream, "utf-8"); - }); - log.info("Sink {} logs {}", spec.sinkName, logs); - } catch (Throwable err) { - log.info("Cannot download sink {} logs", spec.sinkName, err); - } + deleteSink(sinkName); + getSinkInfoNotFound(sinkName); } private void submitSinkConnector(String sinkName, diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java index 4e2e601..93e2e07 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java @@ -40,4 +40,5 @@ public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase implemen public String getTestName() { return "pulsar-standalone-suite"; } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java index 03132f5..459837b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java @@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.topologies; import static org.testng.Assert.assertEquals; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; import org.apache.pulsar.tests.integration.containers.StandaloneContainer; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.testcontainers.containers.Network; @@ -90,4 +91,18 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase { network.close(); } + + + protected void dumpFunctionLogs(String name) { + try { + String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log"; + String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> { + return IOUtils.toString(inputStream, "utf-8"); + }); + log.info("Function {} logs {}", name, logs); + } catch (Throwable err) { + log.info("Cannot download {} logs", name, err); + } + } + }