This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab52a3  [BEAM-12464] Change ProtoSchemaTranslator beam schema 
creation to match the order for protobufs containing Oneof fields (#14974)
5ab52a3 is described below

commit 5ab52a3f4cfe2680098186763550b5f8ad30319c
Author: Reuben van Ammers <reubenvanamm...@gmail.com>
AuthorDate: Fri Jan 14 06:13:00 2022 +1100

    [BEAM-12464] Change ProtoSchemaTranslator beam schema creation to match the 
order for protobufs containing Oneof fields (#14974)
    
    * ProtoSchemaTranslator now orders oneof fields in the resultant beam 
schema in accordance with their location in the protobuf definition
    
    * add reverse order protobuf
    
    * add noncontiguous oneof and some renaming
    
    * Comments and variable renaming
    
    * add reversed row tests
    
    * add noncontiguous tests
    
    * remove redundant null check
    
    * minor test comment update
    
    * update
    
    * add reversedonof test
    
    * add noncontiguous oneof test
    
    Co-authored-by: Reuben van Ammers <reuben.vanamm...@eliiza.com.au>
---
 .../extensions/protobuf/ProtoSchemaTranslator.java |  26 ++++-
 .../protobuf/ProtoDynamicMessageSchemaTest.java    |  86 ++++++++++++++
 .../protobuf/ProtoMessageSchemaTest.java           |  46 ++++++++
 .../protobuf/ProtoSchemaTranslatorTest.java        |  14 +++
 .../sdk/extensions/protobuf/TestProtoSchemas.java  | 125 ++++++++++++++++++++-
 .../src/test/proto/proto3_schema_messages.proto    |  28 +++++
 6 files changed, 314 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java
index 91eb1bd7..ef46b59 100644
--- 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java
@@ -156,13 +156,18 @@ class ProtoSchemaTranslator {
   }
 
   static Schema getSchema(Descriptors.Descriptor descriptor) {
-    Set<Integer> oneOfFields = Sets.newHashSet();
+    /* OneOfComponentFields refers to the field number in the protobuf where 
the component subfields
+     * are. This is needed to prevent double inclusion of the component 
fields.*/
+    Set<Integer> oneOfComponentFields = Sets.newHashSet();
+    /* OneOfFieldLocation stores the field number of the first field in the 
OneOf. Using this, we can use the location
+    of the first field in the OneOf as the location of the entire OneOf.*/
+    Map<Integer, Field> oneOfFieldLocation = Maps.newHashMap();
     List<Field> fields = 
Lists.newArrayListWithCapacity(descriptor.getFields().size());
     for (OneofDescriptor oneofDescriptor : descriptor.getOneofs()) {
       List<Field> subFields = 
Lists.newArrayListWithCapacity(oneofDescriptor.getFieldCount());
       Map<String, Integer> enumIds = Maps.newHashMap();
       for (FieldDescriptor fieldDescriptor : oneofDescriptor.getFields()) {
-        oneOfFields.add(fieldDescriptor.getNumber());
+        oneOfComponentFields.add(fieldDescriptor.getNumber());
         // Store proto field number in a field option.
         FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor);
         subFields.add(
@@ -172,17 +177,26 @@ class ProtoSchemaTranslator {
             enumIds.putIfAbsent(fieldDescriptor.getName(), 
fieldDescriptor.getNumber()) == null);
       }
       FieldType oneOfType = FieldType.logicalType(OneOfType.create(subFields, 
enumIds));
-      fields.add(Field.of(oneofDescriptor.getName(), oneOfType));
+      oneOfFieldLocation.put(
+          oneofDescriptor.getFields().get(0).getNumber(),
+          Field.of(oneofDescriptor.getName(), oneOfType));
     }
 
     for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) 
{
-      if (!oneOfFields.contains(fieldDescriptor.getNumber())) {
+      int fieldDescriptorNumber = fieldDescriptor.getNumber();
+      if (!oneOfComponentFields.contains(fieldDescriptorNumber)) {
         // Store proto field number in metadata.
         FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor);
         fields.add(
-            withFieldNumber(
-                    Field.of(fieldDescriptor.getName(), fieldType), 
fieldDescriptor.getNumber())
+            withFieldNumber(Field.of(fieldDescriptor.getName(), fieldType), 
fieldDescriptorNumber)
                 .withOptions(getFieldOptions(fieldDescriptor)));
+        /* Note that descriptor.getFields() returns an iterator in the order 
of the fields in the .proto file, not
+         * in field number order. Therefore we can safely insert the 
OneOfField at the field of its first component.*/
+      } else {
+        Field oneOfField = oneOfFieldLocation.get(fieldDescriptorNumber);
+        if (oneOfField != null) {
+          fields.add(oneOfField);
+        }
       }
     }
     return Schema.builder()
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
index 9b8f6e3..fa44ed8 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
@@ -23,6 +23,9 @@ import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMI
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_SCHEMA;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
@@ -45,6 +48,15 @@ import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_SCHEMA;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_BOOL;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_INT32;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_PRIMITIVE;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_STRING;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_BOOL;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_INT32;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_PRIMITIVE;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_STRING;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_SCHEMA;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA;
@@ -61,10 +73,12 @@ import com.google.protobuf.TextFormat.ParseException;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
+import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive;
+import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
@@ -257,6 +271,78 @@ public class ProtoDynamicMessageSchemaTest {
   }
 
   @Test
+  public void testReversedOneOfSchema() {
+    ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(ReversedOneOf.getDescriptor());
+    Schema schema = schemaProvider.getSchema();
+    assertEquals(REVERSED_ONEOF_SCHEMA, schema);
+  }
+
+  @Test
+  public void testReversedOneOfProtoToRow() throws 
InvalidProtocolBufferException {
+    ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(ReversedOneOf.getDescriptor());
+    SerializableFunction<DynamicMessage, Row> toRow = 
schemaProvider.getToRowFunction();
+    // equality doesn't work between dynamic messages and other,
+    // so we compare string representation
+    assertEquals(
+        REVERSED_ONEOF_ROW_INT32.toString(),
+        toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_INT32)).toString());
+    assertEquals(
+        REVERSED_ONEOF_ROW_BOOL.toString(),
+        toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_BOOL)).toString());
+    assertEquals(
+        REVERSED_ONEOF_ROW_STRING.toString(),
+        toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_STRING)).toString());
+    assertEquals(
+        REVERSED_ONEOF_ROW_PRIMITIVE.toString(),
+        toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_PRIMITIVE)).toString());
+  }
+
+  @Test
+  public void testReversedOneOfRowToProto() {
+    ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(ReversedOneOf.getDescriptor());
+    SerializableFunction<Row, DynamicMessage> fromRow = 
schemaProvider.getFromRowFunction();
+    assertEquals(
+        REVERSED_ONEOF_PROTO_INT32.toString(), 
fromRow.apply(REVERSED_ONEOF_ROW_INT32).toString());
+    assertEquals(
+        REVERSED_ONEOF_PROTO_BOOL.toString(), 
fromRow.apply(REVERSED_ONEOF_ROW_BOOL).toString());
+    assertEquals(
+        REVERSED_ONEOF_PROTO_STRING.toString(),
+        fromRow.apply(REVERSED_ONEOF_ROW_STRING).toString());
+    assertEquals(
+        REVERSED_ONEOF_PROTO_PRIMITIVE.toString(),
+        fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE).toString());
+  }
+
+  @Test
+  public void testNonContiguousOneOfSchema() {
+    ProtoDynamicMessageSchema schemaProvider =
+        schemaFromDescriptor(NonContiguousOneOf.getDescriptor());
+    Schema schema = schemaProvider.getSchema();
+    assertEquals(NONCONTIGUOUS_ONEOF_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNonContiguousOneOfProtoToRow() throws 
InvalidProtocolBufferException {
+    ProtoDynamicMessageSchema schemaProvider =
+        schemaFromDescriptor(NonContiguousOneOf.getDescriptor());
+    SerializableFunction<DynamicMessage, Row> toRow = 
schemaProvider.getToRowFunction();
+    // equality doesn't work between dynamic messages and other,
+    // so we compare string representation
+    assertEquals(
+        NONCONTIGUOUS_ONEOF_ROW.toString(),
+        toRow.apply(toDynamic(NONCONTIGUOUS_ONEOF_PROTO)).toString());
+  }
+
+  @Test
+  public void testNonContiguousOneOfRowToProto() {
+    ProtoDynamicMessageSchema schemaProvider =
+        schemaFromDescriptor(NonContiguousOneOf.getDescriptor());
+    SerializableFunction<Row, DynamicMessage> fromRow = 
schemaProvider.getFromRowFunction();
+    assertEquals(
+        NONCONTIGUOUS_ONEOF_PROTO.toString(), 
fromRow.apply(NONCONTIGUOUS_ONEOF_ROW).toString());
+  }
+
+  @Test
   public void testOuterOneOfSchema() {
     ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(OuterOneOf.getDescriptor());
     Schema schema = schemaProvider.getSchema();
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
index 480ea1d..f5ce632 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
@@ -23,6 +23,8 @@ import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMI
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
@@ -51,6 +53,14 @@ import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_SCHEMA;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_BOOL;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_INT32;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_PRIMITIVE;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_STRING;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_BOOL;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_INT32;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_PRIMITIVE;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_STRING;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA;
@@ -64,10 +74,12 @@ import 
org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredPrim
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
+import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive;
+import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -279,6 +291,40 @@ public class ProtoMessageSchemaTest {
     assertEquals(OUTER_ONEOF_PROTO, fromRow.apply(OUTER_ONEOF_ROW));
   }
 
+  @Test
+  public void testReversedOneOfProtoToRow() {
+    SerializableFunction<ReversedOneOf, Row> toRow =
+        new 
ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ReversedOneOf.class));
+    assertEquals(REVERSED_ONEOF_ROW_INT32, 
toRow.apply(REVERSED_ONEOF_PROTO_INT32));
+    assertEquals(REVERSED_ONEOF_ROW_BOOL, 
toRow.apply(REVERSED_ONEOF_PROTO_BOOL));
+    assertEquals(REVERSED_ONEOF_ROW_STRING, 
toRow.apply(REVERSED_ONEOF_PROTO_STRING));
+    assertEquals(REVERSED_ONEOF_ROW_PRIMITIVE, 
toRow.apply(REVERSED_ONEOF_PROTO_PRIMITIVE));
+  }
+
+  @Test
+  public void testReversedOneOfRowToProto() {
+    SerializableFunction<Row, ReversedOneOf> fromRow =
+        new 
ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ReversedOneOf.class));
+    assertEquals(REVERSED_ONEOF_PROTO_INT32, 
fromRow.apply(REVERSED_ONEOF_ROW_INT32));
+    assertEquals(REVERSED_ONEOF_PROTO_BOOL, 
fromRow.apply(REVERSED_ONEOF_ROW_BOOL));
+    assertEquals(REVERSED_ONEOF_PROTO_STRING, 
fromRow.apply(REVERSED_ONEOF_ROW_STRING));
+    assertEquals(REVERSED_ONEOF_PROTO_PRIMITIVE, 
fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE));
+  }
+
+  @Test
+  public void testNonContiguousOneOfProtoToRow() {
+    SerializableFunction<NonContiguousOneOf, Row> toRow =
+        new 
ProtoMessageSchema().toRowFunction(TypeDescriptor.of(NonContiguousOneOf.class));
+    assertEquals(NONCONTIGUOUS_ONEOF_ROW, 
toRow.apply(NONCONTIGUOUS_ONEOF_PROTO));
+  }
+
+  @Test
+  public void testNonContiguousOneOfRowToProto() {
+    SerializableFunction<Row, NonContiguousOneOf> fromRow =
+        new 
ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(NonContiguousOneOf.class));
+    assertEquals(NONCONTIGUOUS_ONEOF_PROTO, 
fromRow.apply(NONCONTIGUOUS_ONEOF_ROW));
+  }
+
   private static final EnumerationType ENUM_TYPE =
       EnumerationType.create(ImmutableMap.of("ZERO", 0, "TWO", 2, "THREE", 3));
   private static final Schema ENUM_SCHEMA =
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java
index 9d473bf..f478a94 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java
@@ -84,6 +84,20 @@ public class ProtoSchemaTranslatorTest {
   }
 
   @Test
+  public void testReversedOneOfSchema() {
+    assertEquals(
+        TestProtoSchemas.REVERSED_ONEOF_SCHEMA,
+        
ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.ReversedOneOf.class));
+  }
+
+  @Test
+  public void testNonContiguousOneOfSchema() {
+    assertEquals(
+        TestProtoSchemas.NONCONTIGUOUS_ONEOF_SCHEMA,
+        
ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.NonContiguousOneOf.class));
+  }
+
+  @Test
   public void testNestedOneOfSchema() {
     assertEquals(
         TestProtoSchemas.OUTER_ONEOF_SCHEMA,
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
index 6b9bdf8..40055d0 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
@@ -43,10 +43,12 @@ import 
org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredNest
 import 
org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredPrimitive;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
+import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
 import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive;
+import 
org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf;
 import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage;
 import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.Fixed32;
 import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.Fixed64;
@@ -384,8 +386,8 @@ class TestProtoSchemas {
   static final OneOfType ONE_OF_TYPE = OneOfType.create(ONEOF_FIELDS, 
ONE_OF_ENUM_MAP);
   static final Schema ONEOF_SCHEMA =
       Schema.builder()
-          .addField("special_oneof", FieldType.logicalType(ONE_OF_TYPE))
           .addField(withFieldNumber("place1", FieldType.STRING, 1))
+          .addField("special_oneof", FieldType.logicalType(ONE_OF_TYPE))
           .addField(withFieldNumber("place2", FieldType.INT32, 6))
           .setOptions(withTypeName("proto3_schema_messages.OneOf"))
           .build();
@@ -393,19 +395,19 @@ class TestProtoSchemas {
   // Sample row instances for each OneOf case.
   static final Row ONEOF_ROW_INT32 =
       Row.withSchema(ONEOF_SCHEMA)
-          .addValues(ONE_OF_TYPE.createValue("oneof_int32", 1), "foo", 0)
+          .addValues("foo", ONE_OF_TYPE.createValue("oneof_int32", 1), 0)
           .build();
   static final Row ONEOF_ROW_BOOL =
       Row.withSchema(ONEOF_SCHEMA)
-          .addValues(ONE_OF_TYPE.createValue("oneof_bool", true), "foo", 0)
+          .addValues("foo", ONE_OF_TYPE.createValue("oneof_bool", true), 0)
           .build();
   static final Row ONEOF_ROW_STRING =
       Row.withSchema(ONEOF_SCHEMA)
-          .addValues(ONE_OF_TYPE.createValue("oneof_string", "foo"), "foo", 0)
+          .addValues("foo", ONE_OF_TYPE.createValue("oneof_string", "foo"), 0)
           .build();
   static final Row ONEOF_ROW_PRIMITIVE =
       Row.withSchema(ONEOF_SCHEMA)
-          .addValues(ONE_OF_TYPE.createValue("oneof_primitive", 
PRIMITIVE_ROW), "foo", 0)
+          .addValues("foo", ONE_OF_TYPE.createValue("oneof_primitive", 
PRIMITIVE_ROW), 0)
           .build();
 
   // Sample proto instances for each oneof case.
@@ -443,6 +445,119 @@ class TestProtoSchemas {
   static final OuterOneOf OUTER_ONEOF_PROTO =
       OuterOneOf.newBuilder().setOneofOneof(ONEOF_PROTO_PRIMITIVE).build();
 
+  // The schema for the ReversedOneOf proto.
+  private static final List<Field> REVERSED_ONEOF_FIELDS =
+      ImmutableList.of(
+          withFieldNumber("oneof_int32", FieldType.INT32, 5),
+          withFieldNumber("oneof_bool", FieldType.BOOLEAN, 4),
+          withFieldNumber("oneof_string", FieldType.STRING, 3),
+          withFieldNumber("oneof_primitive", FieldType.row(PRIMITIVE_SCHEMA), 
2));
+
+  private static final Map<String, Integer> REVERSED_ONE_OF_ENUM_MAP =
+      REVERSED_ONEOF_FIELDS.stream()
+          .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f)));
+  static final OneOfType REVERSED_ONE_OF_TYPE =
+      OneOfType.create(REVERSED_ONEOF_FIELDS, REVERSED_ONE_OF_ENUM_MAP);
+
+  static final Schema REVERSED_ONEOF_SCHEMA =
+      Schema.builder()
+          .addField(withFieldNumber("place1", FieldType.STRING, 6))
+          .addField("oneof_reversed", 
FieldType.logicalType(REVERSED_ONE_OF_TYPE))
+          .addField(withFieldNumber("place2", FieldType.INT32, 1))
+          .setOptions(withTypeName("proto3_schema_messages.ReversedOneOf"))
+          .build();
+
+  // Sample row instances for each ReversedOneOf case.
+  static final Row REVERSED_ONEOF_ROW_INT32 =
+      Row.withSchema(REVERSED_ONEOF_SCHEMA)
+          .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_int32", 
1), 0)
+          .build();
+  static final Row REVERSED_ONEOF_ROW_BOOL =
+      Row.withSchema(REVERSED_ONEOF_SCHEMA)
+          .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_bool", 
true), 0)
+          .build();
+  static final Row REVERSED_ONEOF_ROW_STRING =
+      Row.withSchema(REVERSED_ONEOF_SCHEMA)
+          .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_string", 
"foo"), 0)
+          .build();
+  static final Row REVERSED_ONEOF_ROW_PRIMITIVE =
+      Row.withSchema(REVERSED_ONEOF_SCHEMA)
+          .addValues("foo", 
REVERSED_ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), 0)
+          .build();
+
+  // Sample proto instances for each ReversedOneOf case.
+  static final ReversedOneOf REVERSED_ONEOF_PROTO_INT32 =
+      
ReversedOneOf.newBuilder().setOneofInt32(1).setPlace1("foo").setPlace2(0).build();
+  static final ReversedOneOf REVERSED_ONEOF_PROTO_BOOL =
+      
ReversedOneOf.newBuilder().setOneofBool(true).setPlace1("foo").setPlace2(0).build();
+  static final ReversedOneOf REVERSED_ONEOF_PROTO_STRING =
+      
ReversedOneOf.newBuilder().setOneofString("foo").setPlace1("foo").setPlace2(0).build();
+  static final ReversedOneOf REVERSED_ONEOF_PROTO_PRIMITIVE =
+      ReversedOneOf.newBuilder()
+          .setOneofPrimitive(PRIMITIVE_PROTO)
+          .setPlace1("foo")
+          .setPlace2(0)
+          .build();
+
+  // The schema for the NonContiguousOneOf proto.
+  private static final List<Field> NONCONTIGUOUS_ONE_ONEOF_FIELDS =
+      ImmutableList.of(
+          withFieldNumber("oneof_one_int32", FieldType.INT32, 55),
+          withFieldNumber("oneof_one_bool", FieldType.BOOLEAN, 1),
+          withFieldNumber("oneof_one_string", FieldType.STRING, 189),
+          withFieldNumber("oneof_one_primitive", 
FieldType.row(PRIMITIVE_SCHEMA), 22));
+
+  private static final List<Field> NONCONTIGUOUS_TWO_ONEOF_FIELDS =
+      ImmutableList.of(
+          withFieldNumber("oneof_two_first_string", FieldType.STRING, 981),
+          withFieldNumber("oneof_two_int32", FieldType.INT32, 2),
+          withFieldNumber("oneof_two_second_string", FieldType.STRING, 44));
+
+  private static final Map<String, Integer> NONCONTIGUOUS_ONE_ONE_OF_ENUM_MAP =
+      NONCONTIGUOUS_ONE_ONEOF_FIELDS.stream()
+          .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f)));
+
+  private static final Map<String, Integer> NONCONTIGUOUS_TWO_ONE_OF_ENUM_MAP =
+      NONCONTIGUOUS_TWO_ONEOF_FIELDS.stream()
+          .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f)));
+
+  static final OneOfType NONCONTIGUOUS_ONE_ONE_OF_TYPE =
+      OneOfType.create(NONCONTIGUOUS_ONE_ONEOF_FIELDS, 
NONCONTIGUOUS_ONE_ONE_OF_ENUM_MAP);
+
+  static final OneOfType NONCONTIGUOUS_TWO_ONE_OF_TYPE =
+      OneOfType.create(NONCONTIGUOUS_TWO_ONEOF_FIELDS, 
NONCONTIGUOUS_TWO_ONE_OF_ENUM_MAP);
+
+  static final Schema NONCONTIGUOUS_ONEOF_SCHEMA =
+      Schema.builder()
+          .addField(withFieldNumber("place1", FieldType.STRING, 76))
+          .addField(
+              "oneof_non_contiguous_one", 
FieldType.logicalType(NONCONTIGUOUS_ONE_ONE_OF_TYPE))
+          .addField(withFieldNumber("place2", FieldType.INT32, 33))
+          .addField(
+              "oneof_non_contiguous_two", 
FieldType.logicalType(NONCONTIGUOUS_TWO_ONE_OF_TYPE))
+          .addField(withFieldNumber("place3", FieldType.INT32, 63))
+          
.setOptions(withTypeName("proto3_schema_messages.NonContiguousOneOf"))
+          .build();
+
+  static final Row NONCONTIGUOUS_ONEOF_ROW =
+      Row.withSchema(NONCONTIGUOUS_ONEOF_SCHEMA)
+          .addValues(
+              "foo",
+              NONCONTIGUOUS_ONE_ONE_OF_TYPE.createValue("oneof_one_int32", 1),
+              0,
+              
NONCONTIGUOUS_TWO_ONE_OF_TYPE.createValue("oneof_two_second_string", "bar"),
+              343)
+          .build();
+
+  static final NonContiguousOneOf NONCONTIGUOUS_ONEOF_PROTO =
+      NonContiguousOneOf.newBuilder()
+          .setOneofOneInt32(1)
+          .setPlace1("foo")
+          .setPlace2(0)
+          .setOneofTwoSecondString("bar")
+          .setPlace3(343)
+          .build();
+
   static final Schema WKT_MESSAGE_SCHEMA =
       Schema.builder()
           .addField(withFieldNumber("double", FieldType.DOUBLE, 
1).withNullable(true))
diff --git 
a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto 
b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto
index a19ea04..0274864 100644
--- a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto
+++ b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto
@@ -100,6 +100,34 @@ message OuterOneOf {
   }
 }
 
+message ReversedOneOf {
+  string place1 = 6;
+    oneof oneof_reversed {
+        int32 oneof_int32 = 5;
+        bool oneof_bool = 4;
+        string oneof_string = 3;
+        Primitive oneof_primitive = 2;
+    }
+    int32 place2 = 1;
+}
+
+message NonContiguousOneOf {
+  string place1 = 76;
+    oneof oneof_non_contiguous_one {
+        int32 oneof_one_int32 = 55;
+        bool oneof_one_bool = 1;
+        string oneof_one_string = 189;
+        Primitive oneof_one_primitive = 22;
+    }
+    int32 place2 = 33;
+    oneof oneof_non_contiguous_two {
+        string oneof_two_first_string = 981;
+        int32 oneof_two_int32 = 2;
+        string oneof_two_second_string = 44;
+    }
+    int32 place3 = 63;
+}
+
 message EnumMessage {
   enum Enum {
     ZERO  = 0;

Reply via email to