Mike Percy has posted comments on this change.

Change subject: Add AvroKuduEventProducer to Kudu-Flume integration
......................................................................


Patch Set 4:

(7 comments)

http://gerrit.cloudera.org:8080/#/c/4034/4/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
File 
java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java:

Line 60:  * <tr><td>producer.schema.path</td>
should be schemaPath


Line 63:  *     <td>The location of the Avro schema file used to deserialize 
the Avro-encoded event bodies.
Add: If not specified, the schema must be specified on a per-event basis.


Line 73:   private static final String SCHEMA_HEADER = "schemaPath";
Let's use the same property as the Kite sink for the event headers so that both 
Flume sinks can operate on the same events and provide the same behavior.

That means we should use "flume.avro.schema.url" for the header property.


Line 110:           String.format("No schema for event! Specify either property 
%s or event header %s",
s/property/configuration property/


Line 121:       payloadReader = new DataFileReader<>(new 
SeekableByteArrayInput(payload), reader);
We should not treat each Flume Event as an Avro DataFile. We should be treating 
each Flume Event as an Avro record.

I recommend looking at the Flume DatasetSink implementation and doing the same 
thing. Here is how they parse the events:

https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java#L152


Line 150:         if (!col.isNullable()) {
I think this should be:

      if (value == null) {
        if (col.isNullable()) {
          row.setNull(name);
        } else {
          // leave unset for possible Kudu default
        }

...actually, it seems like we are missing something from the client API like 
ColumnSchema.hasDefaultValue() because if we could check that then we could 
throw if the field is null and a default does not exist. We should probably fix 
that in a follow-up.


Line 202:   private DatumReader<GenericRecord> openSchema(String schemaPath) {
It would be nice to support both URL and literal. See how the DatasetSink 
implements this here: 
https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java#L175


-- 
To view, visit http://gerrit.cloudera.org:8080/4034
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I6715df72e447e72f4801a2e026f6840d09b401e1
Gerrit-PatchSet: 4
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-Owner: Will Berkeley <wdberke...@gmail.com>
Gerrit-Reviewer: Kudu Jenkins
Gerrit-Reviewer: Mike Percy <mpe...@apache.org>
Gerrit-Reviewer: Todd Lipcon <t...@apache.org>
Gerrit-Reviewer: Will Berkeley <wdberke...@gmail.com>
Gerrit-HasComments: Yes

Reply via email to