Hello Ottomata,

I'd like you to do a code review.  Please visit

    https://gerrit.wikimedia.org/r/90164

to review the following change.

Change subject: Add JsonStringMessageDecoder from camus' wikimedia branch
......................................................................

Add JsonStringMessageDecoder from camus' wikimedia branch

The file's source is
  
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java
in cea32b3500a4698ecaf3be3ea728ffc9f0f579b5 of the wikimedia branch of
  https://github.com/wikimedia/camus.git

Tests, etc are in follow-up commits

Change-Id: I6999be423f2791c029ca889dd2880a2467424b97
---
M kraken-etl/pom.xml
A 
kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
2 files changed, 97 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/kraken 
refs/changes/64/90164/1

diff --git a/kraken-etl/pom.xml b/kraken-etl/pom.xml
index f674c4b..3446124 100644
--- a/kraken-etl/pom.xml
+++ b/kraken-etl/pom.xml
@@ -13,6 +13,22 @@
   <name>Kraken ETL</name>
   <packaging>jar</packaging>
 
+  <dependencies>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.2.4</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.linkedin.camus</groupId>
+      <artifactId>camus-etl-kafka</artifactId>
+      <version>0.1.0-SNAPSHOT-wmf-1</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+
   <build>
     <plugins>
       <plugin>
diff --git 
a/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
 
b/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
new file mode 100644
index 0000000..a283895
--- /dev/null
+++ 
b/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
@@ -0,0 +1,81 @@
+package com.linkedin.camus.etl.kafka.coders;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.text.SimpleDateFormat;
+
+import com.google.gson.JsonParser;
+import com.google.gson.JsonObject;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.coders.MessageDecoder;
+import com.linkedin.camus.coders.MessageDecoderException;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * MessageDecoder class that will convert the payload into a JSON object,
+ * look for a field named 'timestamp', and then set the CamusWrapper's
+ * timestamp property to the record's timestamp.  If the JSON does not have
+ * a timestamp, then System.currentTimeMillis() will be used.
+ * This MessageDecoder returns a CamusWrapper that works with Strings.
+ */
+public class JsonStringMessageDecoder extends MessageDecoder<byte[], String> {
+       private static org.apache.log4j.Logger log = 
Logger.getLogger(JsonStringMessageDecoder.class);
+
+       public  static final String CAMUS_MESSAGE_TIMESTAMP_FORMAT = 
"camus.message.timestamp.format";
+       public  static final String DEFAULT_TIMESTAMP_FORMAT       = 
"[dd/MMM/yyyy:HH:mm:ss Z]";
+
+       public  static final String CAMUS_MESSAGE_TIMESTAMP_FIELD  = 
"camus.message.timestamp.field";
+       public  static final String DEFAULT_TIMESTAMP_FIELD        = 
"timestamp";
+
+       private String timestampFormat;
+       private String timestampField;
+
+       @Override
+       public void init(Properties props, String topicName) {
+           this.props     = props;
+        this.topicName = topicName;
+
+        timestampFormat = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FORMAT, 
DEFAULT_TIMESTAMP_FORMAT);
+        timestampField  = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FIELD,  
DEFAULT_TIMESTAMP_FIELD);
+       }
+
+       @Override
+       public CamusWrapper<String> decode(byte[] payload) {
+               long       timestamp = 0;
+               String     payloadString;
+               JsonObject jsonObject;
+
+               payloadString =  new String(payload);
+
+               // Parse the payload into a JsonObject.
+               try {
+                       jsonObject = new 
JsonParser().parse(payloadString).getAsJsonObject();
+               } catch (RuntimeException e) {
+                       log.error("Caught exception while parsing JSON string 
'" + payloadString + "'.");
+                       throw new RuntimeException(e);
+               }
+
+               // Attempt to read and parse the timestamp element into a long.
+               if (jsonObject.has(timestampField)) {
+                       String timestampString = 
jsonObject.get(timestampField).getAsString();
+                       try {
+                               timestamp = new 
SimpleDateFormat(timestampFormat).parse(timestampString).getTime();
+                       } catch (Exception e) {
+                               log.error("Could not parse timestamp '" + 
timestampString + "' while decoding JSON message.");
+                       }
+           }
+
+        // If timestamp wasn't set in the above block,
+        // then set it to current time.
+               if (timestamp == 0) {
+                       log.warn("Couldn't find or parse timestamp field '" + 
timestampField + "' in JSON message, defaulting to current time.");
+               timestamp = System.currentTimeMillis();
+       }
+
+        return new CamusWrapper<String>(payloadString, timestamp);
+       }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/90164
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6999be423f2791c029ca889dd2880a2467424b97
Gerrit-PatchSet: 1
Gerrit-Project: analytics/kraken
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to