This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a16b64a  Add EventTime interface to Record (#2446)
a16b64a is described below

commit a16b64a0615b145f695987464cf281f68175985b
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Sat Aug 25 22:33:08 2018 -0700

    Add EventTime interface to Record (#2446)
    
    * Saved copy of work
    
    * revert conf changes
    
    * Prepare for the pr
    
    * Addressed reviewer comments
---
 .../org/apache/pulsar/functions/api/Record.java    |  9 ++++++++
 .../apache/pulsar/functions/sink/PulsarSink.java   |  7 ++++++
 .../pulsar/functions/source/PulsarRecord.java      |  9 ++++++++
 .../org/apache/pulsar/io/twitter/TweetData.java    |  1 +
 .../apache/pulsar/io/twitter/TwitterFireHose.java  | 27 ++++++++++++++++++++--
 .../pulsar/io/twitter/TwitterFireHoseConfig.java   |  3 +++
 6 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 2704909..59cc104 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -47,6 +47,15 @@ public interface Record<T> {
     T getValue();
 
     /**
+     * Retrieves the event time of the record from the source.
+     *
+     * @return millis since epoch
+     */
+    default Optional<Long> getEventTime() {
+        return Optional.empty();
+    }
+
+    /**
      * Retrieves the partition information if any of the record.
      *
      * @return The partition id where the
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index c1da686..c3df393 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Base64;
 import java.util.Map;
+import java.util.Optional;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -239,6 +240,12 @@ public class PulsarSink<T> implements Sink<T> {
             msg.property("__pfn_input_topic__", 
pulsarRecord.getTopicName().get())
                .property("__pfn_input_msg_id__",
                          new 
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+        } else {
+            // It is coming from some source
+            Optional<Long> eventTime = 
sinkRecord.getSourceRecord().getEventTime();
+            if (eventTime.isPresent()) {
+                msg.eventTime(eventTime.get());
+            }
         }
 
         pulsarSinkProcessor.sendOutputMessage(msg, record);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 46c9213..359f48e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -75,6 +75,15 @@ public class PulsarRecord<T> implements 
RecordWithEncryptionContext<T> {
     }
 
     @Override
+    public Optional<Long> getEventTime() {
+        if (message.getEventTime() != 0) {
+            return Optional.of(message.getEventTime());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return message.getEncryptionCtx();
     }
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
index 3e5503d..36d4dc8 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -44,6 +44,7 @@ public class TweetData {
     private String timestampMs;
     private Delete delete;
 
+
     @Data
     public static class User {
         private Long id;
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 2631db1..fc61945 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -34,9 +34,12 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
@@ -46,6 +49,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Simple Push based Twitter FireHose Source
  */
+@Slf4j
 public class TwitterFireHose extends PushSource<TweetData> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TwitterFireHose.class);
@@ -126,7 +130,7 @@ public class TwitterFireHose extends PushSource<TweetData> {
                             // We don't really care if the record succeeds or 
not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for 
connectors
-                            consume(new TwitterRecord(tweet));
+                            consume(new TwitterRecord(tweet, 
config.getGuestimateTweetTime()));
                         } catch (Exception e) {
                             LOG.error("Exception thrown: {}", e);
                         }
@@ -166,9 +170,12 @@ public class TwitterFireHose extends PushSource<TweetData> 
{
 
     static private class TwitterRecord implements Record<TweetData> {
         private final TweetData tweet;
+        private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE 
MMM d HH:mm:ss Z yyyy");
+        private final boolean guestimateTweetTime;
 
-        public TwitterRecord(TweetData tweet) {
+        public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
             this.tweet = tweet;
+            this.guestimateTweetTime = guestimateTweetTime;
         }
 
         @Override
@@ -178,6 +185,22 @@ public class TwitterFireHose extends PushSource<TweetData> 
{
         }
 
         @Override
+        public Optional<Long> getEventTime() {
+            try {
+                if (tweet.getCreatedAt() != null) {
+                    Date d = dateFormat.parse(tweet.getCreatedAt());
+                    return Optional.of(d.toInstant().toEpochMilli());
+                } else if (guestimateTweetTime) {
+                    return Optional.of(System.currentTimeMillis());
+                } else {
+                    return Optional.empty();
+                }
+            } catch (Exception e) {
+                return Optional.empty();
+            }
+        }
+
+        @Override
         public TweetData getValue() {
             return tweet;
         }
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index 83f1baf..88acb33 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -44,6 +44,9 @@ public class TwitterFireHoseConfig implements Serializable {
     private String consumerSecret;
     private String token;
     private String tokenSecret;
+    // Most firehose events have null createdAt time. If this parameter is set 
to true
+    // then we estimate the createdTime of each firehose event to be current 
time.
+    private Boolean guestimateTweetTime = false;
 
     // ------ Optional property keys
 

Reply via email to