Joal has submitted this change and it was merged.

Change subject: Drop support for message without rev id in avro decoders and 
make latestRev mandatory
......................................................................


Drop support for message without rev id in avro decoders and make latestRev 
mandatory

Drop support for legacy mediawiki kafka messages:
Avro message decoders accept a property that allows to use a default
schema to use when no rev id was provided in the kafka message.
This patch removes this backward compat code.

Make org.wikimedia.analytics.schemas.SchemaName.latestRev mandatory:
The previous patch allowed camus to run without a latest schema (using whatever
schema revision was found in the first kafka message). Unfortunately this does
not work and cause issues when the schema rev id change within the same camus
run. Camus will instantiate a DatumWriter and generate a file on the first
message and insert schema metadata in the file header, but if another message
is consumed within the same run the DatumWriter and file headers won't be
updated. This causes the whole run to fail (see T121483).

Change-Id: Ia605d22e65f6ac5f8f0e7fc851180b055ec29518
---
M 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroBinaryMessageDecoder.java
M 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroJsonMessageDecoder.java
M 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
M 
refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageDecoder.java
M 
refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageTimestamps.java
M 
refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroSchemaEvolution.java
M 
refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestMessage.java
7 files changed, 50 insertions(+), 102 deletions(-)

Approvals:
  Joal: Looks good to me, approved
  EBernhardson: Looks good to me, but someone else must approve
  jenkins-bot: Verified



diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroBinaryMessageDecoder.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroBinaryMessageDecoder.java
index 8f11b57..899b435 100644
--- 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroBinaryMessageDecoder.java
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroBinaryMessageDecoder.java
@@ -17,8 +17,6 @@
 import com.linkedin.camus.coders.Message;
 import com.linkedin.camus.coders.MessageDecoder;
 import com.linkedin.camus.coders.MessageDecoderException;
-import com.linkedin.camus.schemaregistry.SchemaDetails;
-import com.linkedin.camus.schemaregistry.SchemaNotFoundException;
 import com.linkedin.camus.schemaregistry.SchemaRegistry;
 
 /**
@@ -32,19 +30,16 @@
  * The writerSchema is mandatory and will be obtained from a field in the 
first bytes of the message :
  * <tt>[MAGIC][LONG][AVRO BINARY]</tt>
  * <br/>
- * The targetSchema is optional and will be obtained from {@link 
KafkaTopicSchemaRegistry#getLatestSchemaByTopic(String)}.
+ * The targetSchema is mandatory and will be obtained from {@link 
KafkaTopicSchemaRegistry#getLatestSchemaByTopic(String)}.
  * </p>
  *
  * This decoder uses the following properties :
  * <ul>
- * <li><tt>camus.message.schema.default</tt>: (optional) the schema ID to use 
when no schema is found in the json body</li>
  * <li><tt>camus.message.timestamp.field</tt>: (default to "timestamp") the of 
the timestamp field</li>
  * <li><tt>camus.message.timestamp.format</tt>: (default to 
"unix_milliseconds") the format of the timestamp field (see {@link 
CamusAvroWrapper})</li>
  * </ul>
  */
 public class AvroBinaryMessageDecoder extends MessageDecoder<Message, 
GenericData.Record> {
-    public static final String CAMUS_SCHEMA_DEFAULT = 
"camus.message.schema.default";
-
     private static final Logger log = 
Logger.getLogger(AvroBinaryMessageDecoder.class);
 
     protected DecoderFactory decoderFactory;
@@ -52,7 +47,7 @@
 
     private String timestampField;
     private String timestampFormat;
-    private String defaultSchemaId = null;
+    private Schema targetSchema;
 
     @Override
     public void init(Properties props, String topicName) {
@@ -66,7 +61,7 @@
             log.info("Underlying schema registry for topic: " + topicName + " 
is: " + registry);
             registry.init(props);
             this.registry = registry;
-            defaultSchemaId = props.getProperty(CAMUS_SCHEMA_DEFAULT);
+            this.targetSchema = 
this.registry.getLatestSchemaByTopic(topicName).getSchema();
         } catch (Exception e) {
             throw new MessageDecoderException(e);
         }
@@ -82,23 +77,10 @@
         try {
             MessageDecoderHelper helper = new 
MessageDecoderHelper(message.getPayload());
             Schema writerSchema = helper.getWriterSchema();
-            if(writerSchema == null && defaultSchemaId != null) {
-                writerSchema = registry.getSchemaByID(topicName, 
defaultSchemaId);
-            }
             if(writerSchema == null) {
                 throw new RuntimeException("No schema found for topic "
-                        + topicName + ": none provided in the message body. 
Use "
-                        + CAMUS_SCHEMA_DEFAULT + " to force a default 
schema.");
+                        + topicName + ": none provided in the message body.");
             }
-            SchemaDetails<Schema> targetSchemaDetails = null;
-            try {
-                targetSchemaDetails = 
registry.getLatestSchemaByTopic(topicName);
-            } catch(SchemaNotFoundException e) {
-                // Ignore this error, targetSchema is optional.
-            }
-            // If a target schema has been specified use it. Use writerSchema 
otherwize.
-            Schema targetSchema = targetSchemaDetails != null ? 
targetSchemaDetails.getSchema() : writerSchema;
-
             DatumReader<GenericData.Record> reader = new 
GenericDatumReader<GenericData.Record>(writerSchema, targetSchema);
 
             return new CamusAvroWrapper(reader.read(null,
diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroJsonMessageDecoder.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroJsonMessageDecoder.java
index 944f79f..58cd0d9 100644
--- 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroJsonMessageDecoder.java
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/coders/AvroJsonMessageDecoder.java
@@ -35,7 +35,7 @@
  *
  * <p>
  * The writerSchema is mandatory and will be obtained from a field in the JSON 
body.
- * The targetSchema is optional and will be obtained from
+ * The targetSchema is mandatory and will be obtained from
  * {@link KafkaTopicSchemaRegistry#getLatestSchemaByTopic(String)}.
  * </p>
  *
@@ -43,8 +43,6 @@
  * <ul>
  * <li><tt>camus.message.schema.id.field</tt>: (required) the name of the
  * JSON field where the schema id can be found</li>
- * <li><tt>camus.message.schema.default</tt>: (optional) the schema ID
- * to use when no schema is found in the json body</li>
  * <li><tt>camus.message.timestamp.field</tt>: (default to "timestamp")
  * the of the timestamp field</li>
  * <li><tt>camus.message.timestamp.format</tt>: (default to 
"unix_milliseconds")
@@ -54,7 +52,6 @@
  */
 public class AvroJsonMessageDecoder extends MessageDecoder<Message, 
GenericData.Record> {
     public static final String CAMUS_SCHEMA_ID_FIELD = 
"camus.message.schema.id.field";
-    public static final String CAMUS_SCHEMA_DEFAULT = 
"camus.message.schema.default";
 
     public static final String DEFAULT_SCHEMA_ID_FIELD = "schemaID";
 
@@ -63,11 +60,11 @@
 
     protected DecoderFactory decoderFactory;
     protected SchemaRegistry<Schema> registry;
+    private Schema targetSchema;
 
     private String schemaIDField;
     private String timestampField;
     private String timestampFormat;
-    private String defaultSchemaId = null;
 
     public AvroJsonMessageDecoder() {
     }
@@ -87,8 +84,8 @@
             log.info("Underlying schema registry for topic: " + topicName + " 
is: " + registry);
             registry.init(props);
 
-            defaultSchemaId = props.getProperty(CAMUS_SCHEMA_DEFAULT);
             this.registry = registry;
+            this.targetSchema = 
registry.getLatestSchemaByTopic(topicName).getSchema();
         } catch (Exception e) {
             throw new MessageDecoderException(e);
         }
@@ -102,14 +99,6 @@
     @Override
     public CamusWrapper<GenericData.Record> decode(Message message) {
         Schema writerSchema = getProducerSchema(message.getPayload());
-        SchemaDetails<Schema> targetSchemaDetails = null;
-        try {
-            targetSchemaDetails = registry.getLatestSchemaByTopic(topicName);
-        } catch(SchemaNotFoundException e) {
-            // Ignore this error, targetSchema is optional.
-        }
-        // If a target schema has been specified use it. Use writerSchema 
otherwise.
-        Schema targetSchema = targetSchemaDetails != null ? 
targetSchemaDetails.getSchema() : writerSchema;
         try {
             DatumReader<GenericData.Record> reader = new 
GenericDatumReader<GenericData.Record>(writerSchema, targetSchema);
             InputStream inStream = new 
ByteArrayInputStream(message.getPayload(), 0, message.getPayload().length);
@@ -156,13 +145,9 @@
         } catch (IOException e) {
             log.warn("Parse error while extracting schema id from json body", 
e);
         }
-        if(schemaId == null && defaultSchemaId != null) {
-            schemaId = defaultSchemaId;
-        }
         if(schemaId == null) {
             throw new RuntimeException("No schema found for topic "
-                    + topicName + ": none provided in the json body. Use "
-                    + CAMUS_SCHEMA_DEFAULT + " to force a default schema.");
+                    + topicName + ": none provided in the json body.");
         }
         return registry.getSchemaByID(topicName, schemaId);
     }
diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
index d2b1a59..79b89d3 100644
--- 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
@@ -1,7 +1,9 @@
 package org.wikimedia.analytics.refinery.camus.schemaregistry;
 
 import com.linkedin.camus.schemaregistry.SchemaDetails;
+import com.linkedin.camus.schemaregistry.SchemaNotFoundException;
 import com.linkedin.camus.schemaregistry.SchemaRegistry;
+
 import org.apache.avro.Schema;
 import org.apache.commons.math3.util.Pair;
 
@@ -50,11 +52,10 @@
     @Override
     public SchemaDetails<Schema> getLatestSchemaByTopic(String topicName) {
         String schemaName = getSchemaNameFromTopic(topicName);
-
-        String latestRev = 
props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev");
+        String property = SCHEMA_NAMESPACE+"."+schemaName+".latestRev";
+        String latestRev = props.getProperty(property);
         if(latestRev == null) {
-            // No latest rev provided
-            return null;
+            throw new SchemaNotFoundException("Latest revision for " + 
schemaName + " is not set, please set " + property + " in camus.properties");
         }
         Schema schema = this.getSchemaByID(topicName, latestRev);
 
diff --git 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageDecoder.java
 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageDecoder.java
index a876e12..f732cee 100644
--- 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageDecoder.java
+++ 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageDecoder.java
@@ -92,9 +92,9 @@
         testProperties.setProperty("camus.message.timestamp.format", 
"unix_milliseconds");
         
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_REGISTRY_CLASS,
                 
"org.wikimedia.analytics.refinery.camus.schemaregistry.KafkaTopicSchemaRegistry");
+        
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_NAMESPACE+".TestSchema.latestRev",
 "0");
 
         decoder.init(testProperties, "testprefix_TestSchema");
-        decoder.decode(testMessage);
     }
 
     @Test(expected = RuntimeException.class)
@@ -105,8 +105,18 @@
                 
"org.wikimedia.analytics.refinery.camus.schemaregistry.KafkaTopicSchemaRegistry");
 
         decoder.init(testProperties, "testprefix_wrongschemaname");
-        decoder.decode(testMessage);
     }
+    
+    @Test(expected = RuntimeException.class)
+    public void testInitFailNoLatestRev() {
+        Properties testProperties = new Properties();
+        testProperties.setProperty("camus.message.timestamp.format", 
"unix_milliseconds");
+        
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_REGISTRY_CLASS,
+                
"org.wikimedia.analytics.refinery.camus.schemaregistry.KafkaTopicSchemaRegistry");
+
+        decoder.init(testProperties, "testprefix_TestSchema");
+    }
+
 
     @Test
     public void testDecode() {
@@ -114,6 +124,7 @@
         testProperties.setProperty("camus.message.timestamp.format", 
"unix_milliseconds");
         
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_REGISTRY_CLASS,
                 
"org.wikimedia.analytics.refinery.camus.schemaregistry.KafkaTopicSchemaRegistry");
+        
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_NAMESPACE+".TestSchema.latestRev",
 "0");
 
         decoder.init(testProperties, "testprefix_TestSchema");
 
@@ -132,6 +143,7 @@
         testProperties.setProperty("camus.message.timestamp.format", 
"unix_milliseconds");
         
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_REGISTRY_CLASS,
                 
"org.wikimedia.analytics.refinery.camus.schemaregistry.KafkaTopicSchemaRegistry");
+        
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_NAMESPACE+".TestSchema.latestRev",
 "0");
 
         decoder.init(testProperties, "testprefix_TestSchema");
 
diff --git 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageTimestamps.java
 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageTimestamps.java
index ce5ef6f..63421ee 100644
--- 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageTimestamps.java
+++ 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroMessageTimestamps.java
@@ -172,6 +172,8 @@
         }
         properties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_REGISTRY_CLASS,
                 KafkaTopicSchemaRegistry.class.getName());
+        
properties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_NAMESPACE+".TestTimestampSchema.latestRev",
 "0");
+
         this.assertion = assertion;
         this.testName = "Test (" + assertion.getClass().getSimpleName() + ") "
                 + "decoder: " + decoderClass.getSimpleName()
diff --git 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroSchemaEvolution.java
 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroSchemaEvolution.java
index 76a9701..0772a61 100644
--- 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroSchemaEvolution.java
+++ 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestAvroSchemaEvolution.java
@@ -44,25 +44,22 @@
     private final static int BINARY = 2;
 
     private final int type;
-    private final boolean includeSchema;
     private MessageDecoder<Message, GenericData.Record> decoder;
+    private KafkaTopicSchemaRegistry registry;
 
     @Parameters
     public static Collection<Object[]> params() throws IOException {
         return Arrays.asList(
-                new Object[]{JSON, true}, // Json with writer schema in message
-                new Object[]{JSON, false}, // Json with writer schema as a 
property
-                new Object[]{BINARY, true}, // Binary with writer schema in 
message
-                new Object[]{BINARY, false} // Binary with writer schema as a 
property
+                new Object[]{JSON},
+                new Object[]{BINARY}
         );
     }
 
-    public TestAvroSchemaEvolution(int type, boolean includeSchema) {
+    public TestAvroSchemaEvolution(int type) {
         this.type = type;
-        this.includeSchema = includeSchema;
     }
 
-    public Record receiveMessage(int fromVersion, int toVersion, boolean 
useWriterSchemaOnly) {
+    public Record receiveMessage(int fromVersion, int toVersion) {
         switch (type) {
         case JSON:
             decoder = new AvroJsonMessageDecoder();
@@ -77,71 +74,52 @@
         
testProperties.setProperty(KafkaTopicSchemaRegistry.SCHEMA_REGISTRY_CLASS,
                 
"org.wikimedia.analytics.refinery.camus.schemaregistry.KafkaTopicSchemaRegistry");
 
-        if(fromVersion != toVersion || !useWriterSchemaOnly) {
-            
testProperties.setProperty("org.wikimedia.analytics.schemas.TestSchema.latestRev",
 String.valueOf(toVersion));
-        }
-        if(!includeSchema) {
-            testProperties.setProperty("camus.message.schema.default", 
String.valueOf(fromVersion));
-        }
+        
testProperties.setProperty("org.wikimedia.analytics.schemas.TestSchema.latestRev",
 String.valueOf(toVersion));
+        registry = new KafkaTopicSchemaRegistry();
+        registry.init(testProperties);
         decoder.init(testProperties, "testprefix_TestSchema");
         return decoder.decode(getMessage(fromVersion)).getRecord();
-    }
-
-    public Record receiveMessage(int fromVersion, int toVersion) {
-        return receiveMessage(fromVersion, toVersion, false);
     }
 
     @Test
     public void testV0_V0() throws IOException {
         Record record = receiveMessage(0,0);
+        assertEquals(registry.getSchemaByID("testprefix_TestSchema", "0"), 
record.getSchema());
         assertV0(record);
     }
 
     @Test
     public void testV0_V1() throws IOException {
         Record record = receiveMessage(0,1);
+        assertEquals(registry.getSchemaByID("testprefix_TestSchema", "1"), 
record.getSchema());
         assertV0ToV1(record);
     }
 
     @Test
     public void testV0_V2() throws IOException {
         Record record = receiveMessage(0,2);
+        assertEquals(registry.getSchemaByID("testprefix_TestSchema", "2"), 
record.getSchema());
         assertV0ToV2(record);
     }
 
     @Test
     public void testV1_V1() throws IOException {
         Record record = receiveMessage(1,1);
+        assertEquals(registry.getSchemaByID("testprefix_TestSchema", "1"), 
record.getSchema());
         assertV1(record);
     }
 
     @Test
     public void testV1_V2() throws IOException {
         Record record = receiveMessage(1,2);
+        assertEquals(registry.getSchemaByID("testprefix_TestSchema", "2"), 
record.getSchema());
         assertV1ToV2(record);
     }
 
     @Test
     public void testV2_V2() throws IOException {
         Record record = receiveMessage(2,2);
-        assertV2(record);
-    }
-
-    @Test
-    public void testWriterSchemaOnlyV0() throws IOException {
-        Record record = receiveMessage(0,0,true);
-        assertV0(record);
-    }
-
-    @Test
-    public void testWriterSchemaOnlyV1() throws IOException {
-        Record record = receiveMessage(1,1,true);
-        assertV1(record);
-    }
-
-    @Test
-    public void testWriterSchemaOnlyV2() throws IOException {
-        Record record = receiveMessage(2,2,true);
+        assertEquals(registry.getSchemaByID("testprefix_TestSchema", "2"), 
record.getSchema());
         assertV2(record);
     }
 
@@ -253,7 +231,7 @@
         writer.write(record, enc);
         enc.flush();
         baos.close();
-        return new TestMessage(baos.toByteArray(), 0, includeSchema);
+        return new TestMessage(baos.toByteArray(), 0);
     }
 
     private TestMessage getAvroBinaryPayloadV1() throws IOException {
@@ -275,7 +253,7 @@
         writer.write(record, enc);
         enc.flush();
         baos.close();
-        return new TestMessage(baos.toByteArray(), 1, includeSchema);
+        return new TestMessage(baos.toByteArray(), 1);
     }
 
     private TestMessage getAvroBinaryPayloadV2() throws IOException {
@@ -296,7 +274,7 @@
         writer.write(record, enc);
         enc.flush();
         baos.close();
-        return new TestMessage(baos.toByteArray(), 2, includeSchema);
+        return new TestMessage(baos.toByteArray(), 2);
     }
 
 
@@ -308,7 +286,7 @@
         someInfo.addProperty("sub1", "val 1");
         someInfo.addProperty("sub2", "val 2");
         avroMessage.add("someInfo", someInfo);
-        return new TestMessage(new GsonBuilder().create().toJson(avroMessage), 
0, includeSchema);
+        return new TestMessage(new GsonBuilder().create().toJson(avroMessage), 
0);
     }
 
     private TestMessage getAvroJsonPayloadV1() {
@@ -324,7 +302,7 @@
         JsonObject union = new JsonObject();
         union.addProperty("string", "my new union");
         avroMessage.add("newUnion", union);
-        return new TestMessage(new GsonBuilder().create().toJson(avroMessage), 
1, includeSchema);
+        return new TestMessage(new GsonBuilder().create().toJson(avroMessage), 
1);
     }
 
     private TestMessage getAvroJsonPayloadV2() {
@@ -339,6 +317,6 @@
         JsonObject union = new JsonObject();
         union.addProperty("string", "my new union");
         avroMessage.add("newUnion", union);
-        return new TestMessage(new GsonBuilder().create().toJson(avroMessage), 
2, includeSchema);
+        return new TestMessage(new GsonBuilder().create().toJson(avroMessage), 
2);
     }
 }
diff --git 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestMessage.java
 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestMessage.java
index 568dfd1..ee33092 100644
--- 
a/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestMessage.java
+++ 
b/refinery-camus/src/test/java/org/wikimedia/analytics/refinery/camus/coders/TestMessage.java
@@ -22,7 +22,6 @@
     private byte[] payload;
     private final int type;
     private long revId;
-    private final boolean includeSchemaInfo;
 
     public TestMessage(byte[] payload) {
         this(payload, 0);
@@ -32,30 +31,19 @@
         this(payloadString, 0);
     }
     public TestMessage(byte[] payload, long revId) {
-        this(payload, revId, true);
-    }
-    public TestMessage(String payloadString, long revId) {
-        this(payloadString, revId, true);
-    }
-    public TestMessage(byte[] payload, long revId, boolean includeSchemaInfo) {
         this.payload = payload;
         this.revId = revId;
         type = AVRO_BIN;
-        this.includeSchemaInfo = includeSchemaInfo;
     }
 
-    public TestMessage(String payloadString, long revId, boolean 
includeSchemaInfo) {
+    public TestMessage(String payloadString, long revId) {
         this.payload = payloadString.getBytes(Charsets.UTF_8);
         this.revId = revId;
         type = AVRO_JSON;
-        this.includeSchemaInfo = includeSchemaInfo;
     }
 
     @Override
     public byte[] getPayload() {
-        if(!includeSchemaInfo) {
-            return payload;
-        }
         if(type == AVRO_BIN) {
             return getAvroBin();
         } else {

-- 
To view, visit https://gerrit.wikimedia.org/r/255105
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ia605d22e65f6ac5f8f0e7fc851180b055ec29518
Gerrit-PatchSet: 6
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to