Hello Ottomata,
I'd like you to do a code review. Please visit
https://gerrit.wikimedia.org/r/90164
to review the following change.
Change subject: Add JsonStringMessageDecoder from camus' wikimedia branch
......................................................................
Add JsonStringMessageDecoder from camus' wikimedia branch
The file's source is
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java
in cea32b3500a4698ecaf3be3ea728ffc9f0f579b5 of the wikimedia branch of
https://github.com/wikimedia/camus.git
Tests, etc are in follow-up commits
Change-Id: I6999be423f2791c029ca889dd2880a2467424b97
---
M kraken-etl/pom.xml
A
kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
2 files changed, 97 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/kraken
refs/changes/64/90164/1
diff --git a/kraken-etl/pom.xml b/kraken-etl/pom.xml
index f674c4b..3446124 100644
--- a/kraken-etl/pom.xml
+++ b/kraken-etl/pom.xml
@@ -13,6 +13,22 @@
<name>Kraken ETL</name>
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.2.4</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.linkedin.camus</groupId>
+ <artifactId>camus-etl-kafka</artifactId>
+ <version>0.1.0-SNAPSHOT-wmf-1</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
diff --git
a/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
b/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
new file mode 100644
index 0000000..a283895
--- /dev/null
+++
b/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java
@@ -0,0 +1,81 @@
+package com.linkedin.camus.etl.kafka.coders;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.text.SimpleDateFormat;
+
+import com.google.gson.JsonParser;
+import com.google.gson.JsonObject;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.coders.MessageDecoder;
+import com.linkedin.camus.coders.MessageDecoderException;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * MessageDecoder class that will convert the payload into a JSON object,
+ * look for a field named 'timestamp', and then set the CamusWrapper's
+ * timestamp property to the record's timestamp. If the JSON does not have
+ * a timestamp, then System.currentTimeMillis() will be used.
+ * This MessageDecoder returns a CamusWrapper that works with Strings.
+ */
+public class JsonStringMessageDecoder extends MessageDecoder<byte[], String> {
+ private static org.apache.log4j.Logger log =
Logger.getLogger(JsonStringMessageDecoder.class);
+
+ public static final String CAMUS_MESSAGE_TIMESTAMP_FORMAT =
"camus.message.timestamp.format";
+ public static final String DEFAULT_TIMESTAMP_FORMAT =
"[dd/MMM/yyyy:HH:mm:ss Z]";
+
+ public static final String CAMUS_MESSAGE_TIMESTAMP_FIELD =
"camus.message.timestamp.field";
+ public static final String DEFAULT_TIMESTAMP_FIELD =
"timestamp";
+
+ private String timestampFormat;
+ private String timestampField;
+
+ @Override
+ public void init(Properties props, String topicName) {
+ this.props = props;
+ this.topicName = topicName;
+
+ timestampFormat = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FORMAT,
DEFAULT_TIMESTAMP_FORMAT);
+ timestampField = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FIELD,
DEFAULT_TIMESTAMP_FIELD);
+ }
+
+ @Override
+ public CamusWrapper<String> decode(byte[] payload) {
+ long timestamp = 0;
+ String payloadString;
+ JsonObject jsonObject;
+
+ payloadString = new String(payload);
+
+ // Parse the payload into a JsonObject.
+ try {
+ jsonObject = new
JsonParser().parse(payloadString).getAsJsonObject();
+ } catch (RuntimeException e) {
+ log.error("Caught exception while parsing JSON string
'" + payloadString + "'.");
+ throw new RuntimeException(e);
+ }
+
+ // Attempt to read and parse the timestamp element into a long.
+ if (jsonObject.has(timestampField)) {
+ String timestampString =
jsonObject.get(timestampField).getAsString();
+ try {
+ timestamp = new
SimpleDateFormat(timestampFormat).parse(timestampString).getTime();
+ } catch (Exception e) {
+ log.error("Could not parse timestamp '" +
timestampString + "' while decoding JSON message.");
+ }
+ }
+
+ // If timestamp wasn't set in the above block,
+ // then set it to current time.
+ if (timestamp == 0) {
+ log.warn("Couldn't find or parse timestamp field '" +
timestampField + "' in JSON message, defaulting to current time.");
+ timestamp = System.currentTimeMillis();
+ }
+
+ return new CamusWrapper<String>(payloadString, timestamp);
+ }
+}
--
To view, visit https://gerrit.wikimedia.org/r/90164
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6999be423f2791c029ca889dd2880a2467424b97
Gerrit-PatchSet: 1
Gerrit-Project: analytics/kraken
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits