This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee53ab6  improve datagenerator source (#3203)
ee53ab6 is described below

commit ee53ab6f21299dad0f19ab42caa94a7e9c336167
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Dec 17 09:01:06 2018 -0800

    improve datagenerator source (#3203)
    
    * improve datagenerator source
    
    * cleaning up
---
 pulsar-io/data-genenator/pom.xml                   |  5 ++
 ...atorSource.java => DataGeneratorPrintSink.java} | 30 ++-----
 .../io/datagenerator/DataGeneratorSource.java      |  4 +-
 .../org/apache/pulsar/io/datagenerator/Person.java | 96 ++++++++++++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  1 +
 5 files changed, 111 insertions(+), 25 deletions(-)

diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-genenator/pom.xml
index 8e9f7ad..6a6b72a 100644
--- a/pulsar-io/data-genenator/pom.xml
+++ b/pulsar-io/data-genenator/pom.xml
@@ -43,6 +43,11 @@
             <artifactId>jfairy</artifactId>
             <version>0.5.9</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
 
b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
similarity index 57%
copy from 
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
copy to 
pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
index 6087747..6944247 100644
--- 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++ 
b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
@@ -18,38 +18,24 @@
  */
 package org.apache.pulsar.io.datagenerator;
 
-import io.codearte.jfairy.Fairy;
-import io.codearte.jfairy.producer.person.Person;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
 
 import java.util.Map;
-import java.util.Optional;
 
-
-public class DataGeneratorSource implements Source<Person> {
+@Slf4j
+public class DataGeneratorPrintSink implements Sink<Person> {
 
     @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
 
     }
 
     @Override
-    public Record<Person> read() throws Exception {
-        Thread.sleep(50);
-        Fairy fairy = Fairy.create();
-        return new Record<Person>() {
-            @Override
-            public Optional<String> getKey() {
-                return Optional.empty();
-            }
-
-            @Override
-            public Person getValue() {
-                return fairy.person();
-            }
-        };
+    public void write(Record<Person> record) throws Exception {
+        log.info("RECV: {}", record.getValue());
     }
 
     @Override
diff --git 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
 
b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
index 6087747..1a9f63f 100644
--- 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++ 
b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.io.datagenerator;
 
 import io.codearte.jfairy.Fairy;
-import io.codearte.jfairy.producer.person.Person;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
@@ -38,7 +37,6 @@ public class DataGeneratorSource implements Source<Person> {
     @Override
     public Record<Person> read() throws Exception {
         Thread.sleep(50);
-        Fairy fairy = Fairy.create();
         return new Record<Person>() {
             @Override
             public Optional<String> getKey() {
@@ -47,7 +45,7 @@ public class DataGeneratorSource implements Source<Person> {
 
             @Override
             public Person getValue() {
-                return fairy.person();
+                return new Person(Fairy.create().person());
             }
         };
     }
diff --git 
a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
 
b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
new file mode 100644
index 0000000..c822d31
--- /dev/null
+++ 
b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.datagenerator;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+/**
+ * This class serves as a copy of of io.codearte.jfairy.producer.person.Person
+ * because io.codearte.jfairy.producer.person.Person does not
+ * have default constructors needed to deserialize POJOs
+ */
+public class Person {
+    private Address address;
+    private String firstName;
+    private String middleName;
+    private String lastName;
+    private String email;
+    private String username;
+    private String password;
+    private Sex sex;
+    private String telephoneNumber;
+    @org.apache.avro.reflect.AvroSchema("{ \"type\": \"long\", 
\"logicalType\": \"timestamp-millis\" }")
+    private long dateOfBirth;
+    private Integer age;
+    private Company company;
+    private String companyEmail;
+    private String nationalIdentityCardNumber;
+    private String nationalIdentificationNumber;
+    private String passportNumber;
+
+    public enum Sex {
+        MALE,
+        FEMALE;
+
+        private Sex() {
+        }
+    }
+
+    public Person(io.codearte.jfairy.producer.person.Person person) {
+        this(new Address(person.getAddress()), person.getFirstName(), 
person.getMiddleName(), person.getLastName(),
+                person.getEmail(), person.getUsername(), person.getPassword(), 
Sex.valueOf(person.getSex().name()),
+                person.getTelephoneNumber(), 
person.getDateOfBirth().getMillis(),
+                person.getAge(), new Company(person.getCompany()), 
person.getCompanyEmail(),
+                person.getNationalIdentityCardNumber(), 
person.getNationalIdentificationNumber(),
+                person.getPassportNumber());
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class Company {
+        private String name;
+        private String domain;
+        private String email;
+        private String vatIdentificationNumber;
+        public Company(io.codearte.jfairy.producer.company.Company company) {
+            this(company.getName(), company.getDomain(), company.getEmail(), 
company.getVatIdentificationNumber());
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class Address {
+        protected String street;
+        protected String streetNumber;
+        protected String apartmentNumber;
+        protected String postalCode;
+        protected String city;
+
+        public Address(io.codearte.jfairy.producer.person.Address address) {
+            this(address.getStreet(), address.getStreetNumber(), 
address.getApartmentNumber(), address.getPostalCode(), address.getCity());
+        }
+    }
+}
diff --git 
a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
index d0fd7a6..0e5d723 100644
--- 
a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
 name: data-generator
 description: Test data generator source
 sourceClass: org.apache.pulsar.io.datagenerator.DataGeneratorSource
+sinkClass: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink

Reply via email to