This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 757222d  [Schema] Provide a generic record interface for representing 
a typed message (#2452)
757222d is described below

commit 757222d9d232dbe03293df890609aa85ff63556c
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Aug 27 22:21:27 2018 -0700

    [Schema] Provide a generic record interface for representing a typed 
message (#2452)
    
    * [Schema] Provide a generic record interface for representing a typed 
message
    
     ### Motivation
    
    In some use cases, the publishers and consumers don't know the type or 
schema of the messages ahead of time.
    For example, in pulsar io connector, when connecting a topic to a jdbc 
table, the connector doesn't know
    the tyep of the messages ahead of time; the connector can only fetch schema 
info from schema registry and
    that is the only information connector knows. It is impossible for mapping 
the messages to a relational database
    table.
    
    So we need a way to present a generic `Struct` record with fields.
    
     ### Changes
    
    Introduce `Field` and `GenericRecord` to represent `Struct` records 
deserialized with a schema.
    
     ### NotCovered
    
    This change only introduces the interfaces. It doesn't integrate with the 
producer and consumer workflow.
    That would be done in subsequent changes if we agree on the interfaces.
---
 .../org/apache/pulsar/client/api/schema/Field.java |  43 +++++++++
 .../pulsar/client/api/schema/GenericRecord.java    |  51 +++++++++++
 .../pulsar/client/impl/schema/AvroSchema.java      |  17 ++--
 .../client/impl/schema/GenericAvroRecord.java      |  79 ++++++++++++++++
 .../client/impl/schema/GenericAvroSchema.java      | 102 +++++++++++++++++++++
 .../pulsar/client/schema/AvroSchemaTest.java       |  47 +++++++++-
 6 files changed, 329 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
new file mode 100644
index 0000000..653b5d6
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * A field in a record, consisting of a field name, index, and
+ * {@link org.apache.pulsar.client.api.Schema} for the field value.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+public class Field {
+
+    /**
+     * The field name.
+     */
+    private final String name;
+    /**
+     * The index of the field within the record.
+     */
+    private final int index;
+
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
new file mode 100644
index 0000000..0a4fce4
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import java.util.List;
+
+/**
+ * An interface represents a message with schema.
+ */
+public interface GenericRecord {
+
+    /**
+     * Returns the list of fields associated with the record.
+     *
+     * @return the list of fields associated with the record.
+     */
+    List<Field> getFields();
+
+    /**
+     * Retrieve the value of the provided <tt>field</tt>.
+     *
+     * @param field the field to retrieve the value
+     * @return the value object
+     */
+    Object getField(Field field);
+
+    /**
+     * Retrieve the value of the provided <tt>fieldName</tt>.
+     *
+     * @param fieldName the field name
+     * @return the value object
+     */
+    Object getField(String fieldName);
+
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 7d90d2b..6867fdc 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -45,8 +45,9 @@ public class AvroSchema<T> implements Schema<T> {
     private BinaryEncoder encoder;
     private ByteArrayOutputStream byteArrayOutputStream;
 
-    private AvroSchema(Class<T> pojo, Map<String, String> properties) {
-        this.schema = ReflectData.AllowNull.get().getSchema(pojo);
+    private AvroSchema(org.apache.avro.Schema schema,
+                       Map<String, String> properties) {
+        this.schema = schema;
 
         this.schemaInfo = new SchemaInfo();
         this.schemaInfo.setName("");
@@ -61,8 +62,7 @@ public class AvroSchema<T> implements Schema<T> {
     }
 
     @Override
-    public byte[] encode(T message) {
-
+    public synchronized byte[] encode(T message) {
         try {
             datumWriter.write(message, this.encoder);
             this.encoder.flush();
@@ -88,11 +88,16 @@ public class AvroSchema<T> implements Schema<T> {
         return this.schemaInfo;
     }
 
+    private static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
+        return ReflectData.AllowNull.get().getSchema(pojo);
+    }
+
     public static <T> AvroSchema<T> of(Class<T> pojo) {
-        return new AvroSchema<>(pojo, Collections.emptyMap());
+        return new AvroSchema<>(createAvroSchema(pojo), 
Collections.emptyMap());
     }
 
     public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> 
properties) {
-        return new AvroSchema<>(pojo, properties);
+        return new AvroSchema<>(createAvroSchema(pojo), properties);
     }
+
 }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
new file mode 100644
index 0000000..fb65c7a
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+
+/**
+ * A generic avro record.
+ */
+@Slf4j
+class GenericAvroRecord implements GenericRecord {
+
+    private final org.apache.avro.Schema schema;
+    private final List<Field> fields;
+    private final org.apache.avro.generic.GenericRecord record;
+
+    GenericAvroRecord(org.apache.avro.Schema schema,
+                      List<Field> fields,
+                      org.apache.avro.generic.GenericRecord record) {
+        this.schema = schema;
+        this.fields = fields;
+        this.record = record;
+    }
+
+    @Override
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    @Override
+    public Object getField(Field field) {
+        return getField(field.getName());
+    }
+
+    @Override
+    public Object getField(String fieldName) {
+        Object value = record.get(fieldName);
+        if (value instanceof Utf8) {
+            return ((Utf8) value).toString();
+        } else if (value instanceof org.apache.avro.generic.GenericRecord) {
+            org.apache.avro.generic.GenericRecord avroRecord =
+                (org.apache.avro.generic.GenericRecord) value;
+            org.apache.avro.Schema recordSchema = avroRecord.getSchema();
+            List<Field> fields = recordSchema.getFields()
+                .stream()
+                .map(f -> new Field(f.name(), f.pos()))
+                .collect(Collectors.toList());
+            return new GenericAvroRecord(schema, fields, avroRecord);
+        } else {
+            return value;
+        }
+    }
+
+    org.apache.avro.generic.GenericRecord getAvroRecord() {
+        return record;
+    }
+
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
new file mode 100644
index 0000000..4ccfe55
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A generic avro schema.
+ */
+public class GenericAvroSchema implements Schema<GenericRecord> {
+
+    private final org.apache.avro.Schema schema;
+    private final List<Field> fields;
+    private final SchemaInfo schemaInfo;
+    private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> 
datumWriter;
+    private BinaryEncoder encoder;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final GenericDatumReader<org.apache.avro.generic.GenericRecord> 
datumReader;
+
+    public GenericAvroSchema(SchemaInfo schemaInfo) {
+        this.schemaInfo = schemaInfo;
+        this.schema = new org.apache.avro.Schema.Parser().parse(
+            new String(schemaInfo.getSchema(), UTF_8)
+        );
+        this.fields = schema.getFields()
+            .stream()
+            .map(f -> new Field(f.name(), f.pos()))
+            .collect(Collectors.toList());
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.encoder = 
EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+        this.datumWriter = new GenericDatumWriter(schema);
+        this.datumReader = new GenericDatumReader(schema);
+    }
+
+    public org.apache.avro.Schema getAvroSchema() {
+        return schema;
+    }
+
+    @Override
+    public synchronized byte[] encode(GenericRecord message) {
+        checkArgument(message instanceof GenericAvroRecord);
+        GenericAvroRecord gar = (GenericAvroRecord) message;
+        try {
+            datumWriter.write(gar.getAvroRecord(), this.encoder);
+            this.encoder.flush();
+            return this.byteArrayOutputStream.toByteArray();
+        } catch (Exception e) {
+            throw new SchemaSerializationException(e);
+        } finally {
+            this.byteArrayOutputStream.reset();
+        }
+    }
+
+    @Override
+    public GenericRecord decode(byte[] bytes) {
+        try {
+            org.apache.avro.generic.GenericRecord avroRecord = 
datumReader.read(
+                null,
+                DecoderFactory.get().binaryDecoder(bytes, null));
+            return new GenericAvroRecord(schema, fields, avroRecord);
+        } catch (IOException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return schemaInfo;
+    }
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index 84c060e..cfe9cb7 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pulsar.client.schema;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -65,10 +70,10 @@ public class AvroSchemaTest {
     @Test
     public void testSchema() {
         AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
-        Assert.assertEquals(avroSchema.getSchemaInfo().getType(), 
SchemaType.AVRO);
+        assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
         Schema.Parser parser = new Schema.Parser();
         String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
-        Assert.assertEquals(schemaJson, SCHEMA_JSON);
+        assertEquals(schemaJson, SCHEMA_JSON);
         Schema schema = parser.parse(schemaJson);
 
         for (String fieldName : FOO_FIELDS) {
@@ -103,7 +108,41 @@ public class AvroSchemaTest {
         Foo object1 = avroSchema.decode(bytes1);
         Foo object2 = avroSchema.decode(bytes2);
 
-        Assert.assertEquals(object1, foo1);
-        Assert.assertEquals(object2, foo2);
+        assertEquals(object1, foo1);
+        assertEquals(object2, foo2);
+    }
+
+    @Test
+    public void testEncodeAndDecodeGenericRecord() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
+
+        GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
+
+        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
+
+        int numRecords = 10;
+        for (int i = 0; i < numRecords; i++) {
+            Foo foo = new Foo();
+            foo.setField1("field-1-" + i);
+            foo.setField2("field-2-" + i);
+            foo.setField3(i);
+            Bar bar = new Bar();
+            bar.setField1(i % 2 == 0);
+            foo.setField4(bar);
+
+            byte[] data = avroSchema.encode(foo);
+
+            GenericRecord record = genericAvroSchema.decode(data);
+            Object field1 = record.getField("field1");
+            assertEquals("field-1-" + i, field1, "Field 1 is " + 
field1.getClass());
+            Object field2 = record.getField("field2");
+            assertEquals("field-2-" + i, field2, "Field 2 is " + 
field2.getClass());
+            Object field3 = record.getField("field3");
+            assertEquals(i, field3, "Field 3 is " + field3.getClass());
+            Object field4 = record.getField("field4");
+            assertTrue(field4 instanceof GenericRecord);
+            GenericRecord field4Record = (GenericRecord) field4;
+            assertEquals(i % 2 == 0, field4Record.getField("field1"));
+        }
     }
 }

Reply via email to