Ottomata has submitted this change and it was merged. Change subject: Add JsonStringMessageDecoder from camus' wikimedia branch ......................................................................
Add JsonStringMessageDecoder from camus' wikimedia branch The file's source is camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java in cea32b3500a4698ecaf3be3ea728ffc9f0f579b5 of the wikimedia branch of https://github.com/wikimedia/camus.git Tests, etc are in follow-up commits Change-Id: I6999be423f2791c029ca889dd2880a2467424b97 --- M kraken-etl/pom.xml A kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java 2 files changed, 97 insertions(+), 0 deletions(-) Approvals: Ottomata: Verified; Looks good to me, approved diff --git a/kraken-etl/pom.xml b/kraken-etl/pom.xml index f674c4b..3446124 100644 --- a/kraken-etl/pom.xml +++ b/kraken-etl/pom.xml @@ -13,6 +13,22 @@ <name>Kraken ETL</name> <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.2.4</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>com.linkedin.camus</groupId> + <artifactId>camus-etl-kafka</artifactId> + <version>0.1.0-SNAPSHOT-wmf-1</version> + <scope>compile</scope> + </dependency> + </dependencies> + <build> <plugins> <plugin> diff --git a/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java b/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java new file mode 100644 index 0000000..a283895 --- /dev/null +++ b/kraken-etl/src/main/java/org/wikimedia/analytics/kraken/etl/camus/kafka/coders/JsonStringMessageDecoder.java @@ -0,0 +1,81 @@ +package com.linkedin.camus.etl.kafka.coders; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Properties; +import java.text.SimpleDateFormat; + +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; + +import com.linkedin.camus.coders.CamusWrapper; +import com.linkedin.camus.coders.MessageDecoder; +import com.linkedin.camus.coders.MessageDecoderException; + +import org.apache.log4j.Logger; + + +/** + * MessageDecoder class that will convert the payload into a JSON object, + * look for a field named 'timestamp', and then set the CamusWrapper's + * timestamp property to the record's timestamp. If the JSON does not have + * a timestamp, then System.currentTimeMillis() will be used. + * This MessageDecoder returns a CamusWrapper that works with Strings. + */ +public class JsonStringMessageDecoder extends MessageDecoder<byte[], String> { + private static org.apache.log4j.Logger log = Logger.getLogger(JsonStringMessageDecoder.class); + + public static final String CAMUS_MESSAGE_TIMESTAMP_FORMAT = "camus.message.timestamp.format"; + public static final String DEFAULT_TIMESTAMP_FORMAT = "[dd/MMM/yyyy:HH:mm:ss Z]"; + + public static final String CAMUS_MESSAGE_TIMESTAMP_FIELD = "camus.message.timestamp.field"; + public static final String DEFAULT_TIMESTAMP_FIELD = "timestamp"; + + private String timestampFormat; + private String timestampField; + + @Override + public void init(Properties props, String topicName) { + this.props = props; + this.topicName = topicName; + + timestampFormat = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FORMAT, DEFAULT_TIMESTAMP_FORMAT); + timestampField = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FIELD, DEFAULT_TIMESTAMP_FIELD); + } + + @Override + public CamusWrapper<String> decode(byte[] payload) { + long timestamp = 0; + String payloadString; + JsonObject jsonObject; + + payloadString = new String(payload); + + // Parse the payload into a JsonObject. + try { + jsonObject = new JsonParser().parse(payloadString).getAsJsonObject(); + } catch (RuntimeException e) { + log.error("Caught exception while parsing JSON string '" + payloadString + "'."); + throw new RuntimeException(e); + } + + // Attempt to read and parse the timestamp element into a long. + if (jsonObject.has(timestampField)) { + String timestampString = jsonObject.get(timestampField).getAsString(); + try { + timestamp = new SimpleDateFormat(timestampFormat).parse(timestampString).getTime(); + } catch (Exception e) { + log.error("Could not parse timestamp '" + timestampString + "' while decoding JSON message."); + } + } + + // If timestamp wasn't set in the above block, + // then set it to current time. + if (timestamp == 0) { + log.warn("Couldn't find or parse timestamp field '" + timestampField + "' in JSON message, defaulting to current time."); + timestamp = System.currentTimeMillis(); + } + + return new CamusWrapper<String>(payloadString, timestamp); + } +} -- To view, visit https://gerrit.wikimedia.org/r/90164 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I6999be423f2791c029ca889dd2880a2467424b97 Gerrit-PatchSet: 1 Gerrit-Project: analytics/kraken Gerrit-Branch: master Gerrit-Owner: QChris <[email protected]> Gerrit-Reviewer: Ottomata <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
