Hey Fengyun and all,
It seems like the AvroMultipleOutputs API is broken when trying to use a large
number of schemas (I am using ~55 schemas total size ~4MB). Why do I have to
specify the output schemas using the AvroJob API if they are already added
using the AvroMultipleOutputs API?
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
AvroMultipleOutputs API
for (FileStatus schemaStatus : schemasStatus) {
String schemaContent = readFile(fs, schemaStatus.getPath());
Schema schema = parser.parse(schemaContent);
AvroMultipleOutputs.addNamedOutput(job,
schema.getName().replace("_", ""),
AvroKeyOutputFormat.class, schema);
schemas.add(schema);
}
AvroJob API
AvroJob.setOutputKeySchema(job, Schema.createUnion(schemas));
The problem I am running into is a OOM exception in the Reducer when I’m trying
to write to the AvroMultipleOutputs. It seems like the AvroMultipleOutputs
class is putting my giant schema union into memory via a HashMap.
2014-05-01 10:27:45,337 FATAL org.apache.hadoop.mapred.Child: Error running
child : java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap.createEntry(HashMap.java:901)
at java.util.HashMap.addEntry(HashMap.java:888)
at java.util.HashMap.put(HashMap.java:509)
at org.apache.avro.Schema$UnionSchema.<init>(Schema.java:824)
at org.apache.avro.Schema.parse(Schema.java:1221)
at org.apache.avro.Schema.parse(Schema.java:1148)
at org.apache.avro.Schema.parse(Schema.java:1148)
at org.apache.avro.Schema.parse(Schema.java:1220)
at org.apache.avro.Schema$Parser.parse(Schema.java:981)
at org.apache.avro.Schema$Parser.parse(Schema.java:971)
at org.apache.avro.Schema.parse(Schema.java:1020)
at
org.apache.avro.mapreduce.AvroJob.getOutputKeySchema(AvroJob.java:184)
at
org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:85)
at
org.apache.avro.mapreduce.AvroMultipleOutputs.getRecordWriter(AvroMultipleOutputs.java:445)
at
org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:407)
at
org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:367)
at
com.ryantabora.mapreduce.SerializerReducer.reduce(SerializerReducer.java:52)
at
com.ryantabora.mapreduce.SerializerReducer.reduce(SerializerReducer.java:20)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Why is AvroMultipleOutputs even looking at the AvroJob output key schema
definition? Can’t it just pull the schema from my named outputs?
Regards,
Ryan Tabora
http://ryantabora.com
On April 30, 2014 at 9:49:19 PM, Ryan Tabora ([email protected]) wrote:
Wow not sure how I missed this, thank you! :)
Regards,
Ryan Tabora
http://ryantabora.com
On Wed, Apr 30, 2014 at 9:41 PM, Fengyun RAO <[email protected]> wrote:
We also used AvroMultipleOutputs to deal with multiple schemas.
the problem stands the same, you have to set a single mapper output
type (or schema) before submitting the MR job. since there are
multiple schemas, we used Schema.createUnion(List<Schema> types) as
the mapper output schema.
you could write a method to generate the list of schemas from the
input data, before submitting the MR job.
2014-04-30 21:46 GMT+08:00, Ryan Tabora <[email protected]>:
> Thanks Rao, I understand how I could do it if I had a single schema across
> all input data. However, my question is if my input data will vary and one
> input could have a different schema from another.
>
> My idea would be to use something like MultipleOutputs or partitioning to
> split up the output data by unique schema.
>
> I guess the question still stands, does anyone have any recommendations for
> dynamically generating the schema using Avro output formats?
>
> Thanks,
> Ryan Tabora
> http://ryantabora.com
>
> On April 29, 2014 at 11:41:51 PM, Fengyun RAO ([email protected]) wrote:
>
> take MapReduce for example, which requires Runner, Mapper, Reducer
>
> the Mapper requires outputting a single Type (or a single Avro schema).
>
> If you have a set of CSV files with different schemas, what output type
> would you expect?
>
> If all the CSV files share the same schema, you could dynamically create the
> schema in the Runner before submitting a MR job.
> If you look into the Schema.java, you would find create(), createRecord(),
> etc. APIs.
> you could simply read one CSV file head, and create the schema using these
> APIs.
> e.g.
> AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
> creates a schema with only a String field.
>
>
>
> 2014-04-30 4:56 GMT+08:00 Ryan Tabora <[email protected]>:
> Hi all,
>
> Whether you’re using Hive or MapReduce, avro input/output formats require
> you to specify a schema at the beginning of the job or the table definition
> in order to work with them. Is there any way to configure the jobs in a way
> that the input/output formats can dynamically determine the schema from the
> data itself?
>
> Think about a job like this. I have a set of CSV files that I want to
> serialize into avro files. These CSV files are self describing and each CSV
> file has a unique schema. If I want to write a job that scans over all of
> this data and serialize it into avro I can’t do that with today’s tools (as
> far as I know). If I can’t specify the schema up front, what can I do? Am I
> forced to write my own avro input/output formats?
>
> The avro schema is stored within the avro data file itself, why can’t these
> input/output formats be smart enough to figure that out? Am I fundamentally
> doing something against the principles of the avro format? I would be
> surprised if no one has run into this issue before.
>
> Regards,
> Ryan Tabora
>
>
--
----------------------------------------------------------------
RAO Fengyun
Center for Astrophysics, Tsinghua University
Tel: +86 13810626496
Email: [email protected]
[email protected]
-----------------------------------------------------------------