Super
2014-07-04 17:30 GMT+02:00 Gabriel Reid <[email protected]>: > I've added a Jira ticket > (https://issues.apache.org/jira/browse/CRUNCH-433) to get better > compatibility for this kind of thing directly in Crunch. > > On Fri, Jul 4, 2014 at 5:14 PM, Gabriel Reid <[email protected]> wrote: >> Hi Barrie, >> >> It's due to a dumb error on my part, sorry about that. The schema that >> you pass in to the AvroType constructor needs to be the schema for the >> AvroKeyValue class. >> >> So instead of: >> >> Schema specificRecordSchema = LiveTrackingLine.SCHEMA$; >> >> it should be: >> >> Schema schema = >> AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING), >> LiveTrackingLine.SCHEMA$); >> >> Supplying that schema to the AvroType constructor should get you >> around the error you're running into now. >> >> - Gabriel >> >> >> On Fri, Jul 4, 2014 at 4:30 PM, B Kersbergen <[email protected]> wrote: >>> Hi Gabriel, >>> >>> Yes all our avro data is wrapped in the Avro KeyValuePair because >>> map-reduce jobs like to reason on keys and values. >>> >>> This is now my code: >>> String inputPath = args[0]; >>> String outputPath = args[1]; >>> >>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class, >>> ExtractViewsJob.class.getSimpleName(), getConf()); >>> >>> Schema specificRecordSchema = LiveTrackingLine.SCHEMA$; >>> AvroType<GenericData.Record> customAvroType = new >>> AvroType<GenericData.Record>( >>> GenericData.Record.class, specificRecordSchema, >>> NoOpDeepCopier.<GenericData.Record>create(), >>> Avros.specifics(LiveTrackingLine.class)); >>> PCollection<GenericData.Record> pcollection = >>> pipeline.read(From.avroFile(inputPath, customAvroType)); >>> >>> pipeline.write(pcollection, new TextFileTarget(outputPath)); >>> return pipeline.done().succeeded() ? 0 : 1; >>> >>> when running this I get the following job error: >>> >>> org.apache.avro.AvroTypeException: Found >>> org.apache.avro.mapreduce.KeyValuePair, expecting >>> com.bol.hadoop.enrich.record.LiveTrackingLine >>> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) >>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>> at >>> org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) >>> >>> (I'm using crunch-core 0.10.0-hadoop2) >>> >>> Do you have any idea whats going wrong here? >>> >>> 2014-07-04 15:31 GMT+02:00 Gabriel Reid <[email protected]>: >>>> Hi Barrie, >>>> >>>> Inlined below >>>> >>>> On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <[email protected]> wrote: >>>>> Hi, >>>>> >>>>> I’m stuck trying to read an avro KeyValuePair datafile with crunch. >>>>> >>>>> This is a header dump of my avro data file: >>>>> >>>>> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A >>>>> key/value pair","fields":[{"name":"key","type":"string","doc":"The >>>>> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record", >>>>> etc etc >>>>> >>>>> I’m only interested in the LiveTrackingLine object but I probably need >>>>> to read the whole KeyValuePair object and extract the LiveTrackingLine >>>>> in the crunch pipeline. >>>>> >>>>> This is the code I have so far. >>>>> >>>>> String inputPath = args[0]; >>>>> String outputPath = args[1]; >>>>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class, >>>>> ExtractViewsJob.class.getSimpleName(), getConf()); >>>>> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>> lines = >>>>> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String, >>>>> LiveTrackingLine>>>(inputPath)); >>>>> >>>>> I’m a bit lost in the last part where I configure the 'pipeline' >>>>> object with the right avro schema(s) and input dir. >>>> >>>> The easiest basic case here if you just want to read the data is to do >>>> it as follows: >>>> >>>> PCollection<GenericData.Record> lines = >>>> pipeline.read(From.avroFile(inputPath)); >>>> >>>> However, as you can see, this will give you a PCollection of >>>> GenericData.Record objects. >>>> >>>>> Because my schema is very complex I >>>>> want to parse this as a ‘specific’ and not as a ‘generic’ or >>>>> ‘reflective’ avro representation, this is also a learning experience >>>>> in using avro with crunch. >>>> >>>> This is not really currently officially supported in Crunch (although >>>> there is a way to work around it). Because your top-level record type >>>> is a generic record, you need to kind of skip around some of the basic >>>> convenience methods of Crunch to get your specific records read as >>>> actual >>>> specific records. >>>> >>>> The workaround you can use it to do something like this to read in >>>> your PCollection: >>>> >>>> Schema specificRecordSchema = ...; >>>> AvroType<GenericData.Record> customAvroType = new >>>> AvroType<GenericData.Record>( >>>> GenericData.Record.class, specificRecordSchema, >>>> NoOpDeepCopier.<GenericData.Record>create(), >>>> Avros.specifics(MySpecificRecord.class)); >>>> >>>> PCollection<GenericData.Record> pcollection = >>>> pipeline.read(From.avroFile(inputPath, customAvroType)); >>>> >>>> This will give you a PCollection of the GenericRecords that contain >>>> instances of your specific class. However, be warned that this is >>>> making use of what could be considered non-public APIs that could >>>> change in the future. >>>> >>>> Is this (reading Avro files that have been serialized using >>>> AvroKeyValue) a case that you will be using regularly? It might be >>>> interesting to add this as built-in functionality in Crunch so you >>>> don't need to mess around with creating your own AvroType. >>>> >>>> - Gabriel
