This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new df4c8600740 Added dlq and error metrics to pubsublite read schema transform (#26286) df4c8600740 is described below commit df4c8600740cc285473192fa69e8493418ea23c1 Author: Dip Patel <37777672+dippate...@users.noreply.github.com> AuthorDate: Tue May 2 10:30:22 2023 -0400 Added dlq and error metrics to pubsublite read schema transform (#26286) --- .../PubsubLiteReadSchemaTransformProvider.java | 77 +++++++++-- .../gcp/pubsublite/internal/PubsubLiteDlqTest.java | 141 +++++++++++++++++++++ 2 files changed, 207 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java index d1cfd19c98f..5ea205393c5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java @@ -23,12 +23,16 @@ import com.google.cloud.pubsublite.CloudRegionOrZone; import com.google.cloud.pubsublite.ProjectId; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -37,34 +41,84 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.JsonUtils; -import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.FinishBundle; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class PubsubLiteReadSchemaTransformProvider extends TypedSchemaTransformProvider< PubsubLiteReadSchemaTransformProvider.PubsubLiteReadSchemaTransformConfiguration> { + private static final Logger LOG = + LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class); + public static final String VALID_FORMATS_STR = "AVRO,JSON"; public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet(VALID_FORMATS_STR.split(",")); + public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {}; + public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; + public static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("error").addNullableByteArrayField("row").build(); + @Override protected @UnknownKeyFor @NonNull @Initialized Class<PubsubLiteReadSchemaTransformConfiguration> configurationClass() { return PubsubLiteReadSchemaTransformConfiguration.class; } + public static class ErrorFn extends DoFn<SequencedMessage, Row> { + private SerializableFunction<byte[], Row> valueMapper; + private Counter errorCounter; + private Long errorsInBundle = 0L; + + public ErrorFn(String name, SerializableFunction<byte[], Row> valueMapper) { + this.errorCounter = Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, name); + this.valueMapper = valueMapper; + } + + @ProcessElement + public void process(@DoFn.Element SequencedMessage seqMessage, MultiOutputReceiver receiver) { + try { + receiver + .get(OUTPUT_TAG) + .output(valueMapper.apply(seqMessage.getMessage().getData().toByteArray())); + } catch (Exception e) { + errorsInBundle += 1; + LOG.warn("Error while parsing the element", e); + receiver + .get(ERROR_TAG) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(e.toString(), seqMessage.getMessage().getData().toByteArray()) + .build()); + } + } + + @FinishBundle + public void finish(FinishBundleContext c) { + errorCounter.inc(errorsInBundle); + errorsInBundle = 0L; + } + } + @Override public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( PubsubLiteReadSchemaTransformConfiguration configuration) { @@ -100,8 +154,7 @@ public class PubsubLiteReadSchemaTransformProvider throw new IllegalArgumentException( "Unable to infer the project to read from Pubsub Lite. Please provide a project."); } - return PCollectionRowTuple.of( - "output", + PCollectionTuple outputTuple = input .getPipeline() .apply( @@ -118,12 +171,14 @@ public class PubsubLiteReadSchemaTransformProvider .build()) .build())) .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - seqMess -> - valueMapper.apply( - seqMess.getMessage().getData().toByteArray()))) - .setRowSchema(beamSchema)); + ParDo.of(new ErrorFn("PubsubLite-read-error-counter", valueMapper)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + return PCollectionRowTuple.of( + "output", + outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema), + "errors", + outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); } }; } @@ -144,7 +199,7 @@ public class PubsubLiteReadSchemaTransformProvider @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() { - return Collections.singletonList("output"); + return Arrays.asList("output", "errors"); } @AutoValue diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java new file mode 100644 index 00000000000..04b98ed6008 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite.internal; + +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider; +import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.ErrorFn; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class PubsubLiteDlqTest { + + private static final TupleTag<Row> OUTPUTTAG = PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG; + private static final TupleTag<Row> ERRORTAG = PubsubLiteReadSchemaTransformProvider.ERROR_TAG; + + private static final Schema BEAMSCHEMA = + Schema.of(Schema.Field.of("name", Schema.FieldType.STRING)); + private static final Schema ERRORSCHEMA = PubsubLiteReadSchemaTransformProvider.ERROR_SCHEMA; + + private static final List<Row> ROWS = + Arrays.asList( + Row.withSchema(BEAMSCHEMA).withFieldValue("name", "a").build(), + Row.withSchema(BEAMSCHEMA).withFieldValue("name", "b").build(), + Row.withSchema(BEAMSCHEMA).withFieldValue("name", "c").build()); + + private static final List<SequencedMessage> MESSAGES = + Arrays.asList( + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("{\"name\":\"a\"}")) + .build()) + .build(), + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("{\"name\":\"b\"}")) + .build()) + .build(), + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("{\"name\":\"c\"}")) + .build()) + .build()); + + private static final List<SequencedMessage> MESSAGESWITHERROR = + Arrays.asList( + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("{\"error\":\"a\"}")) + .build()) + .build(), + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("{\"error\":\"b\"}")) + .build()) + .build(), + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("{\"error\":\"c\"}")) + .build()) + .build()); + + final SerializableFunction<byte[], Row> valueMapper = + JsonUtils.getJsonBytesToRowFunction(BEAMSCHEMA); + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testPubsubLiteErrorFnSuccess() throws Exception { + PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES)); + PCollectionTuple output = + input.apply( + ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper)) + .withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG))); + + output.get(OUTPUTTAG).setRowSchema(BEAMSCHEMA); + output.get(ERRORTAG).setRowSchema(ERRORSCHEMA); + + PAssert.that(output.get(OUTPUTTAG)).containsInAnyOrder(ROWS); + p.run().waitUntilFinish(); + } + + @Test + public void testPubsubLiteErrorFnFailure() throws Exception { + PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGESWITHERROR)); + PCollectionTuple output = + input.apply( + ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper)) + .withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG))); + + output.get(OUTPUTTAG).setRowSchema(BEAMSCHEMA); + output.get(ERRORTAG).setRowSchema(ERRORSCHEMA); + + PCollection<Long> count = output.get(ERRORTAG).apply("error_count", Count.globally()); + + PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L)); + + p.run().waitUntilFinish(); + } +}