Hello, anyone out there know about AVRO file formats and/or Hadoop support?

My Hadoop AvroJob code does not recognize the AVRO files created by my other code. It seems that the MAGIC number is wrong.

What is going on? How many different ways of encoding AVRO files are there, and how do I make sure they match.

I am creating the input files like this...

static public void write(String file, GenericRecord record, Schema schema) throws IOException {
        OutputStream o = new FileOutputStream(file);
        GenericDatumWriter w = new GenericDatumWriter(schema);
        Encoder e = EncoderFactory.get().binaryEncoder(o, null);
        w.write(record, e);
        e.flush();
    }

Hadoop is reading them using org.apache.avro.file.DataFileReader

Here is where it breaks. I checked, and it really is trying to read the right file...

      /** Open a reader for a file. */
      public static <D> FileReader<D> openReader(SeekableInput in,
                                                 DatumReader<D> reader)
        throws IOException {
        if (in.length() < MAGIC.length)
          throw new IOException("Not an Avro data file");

        // read magic header
        byte[] magic = new byte[MAGIC.length];
        in.seek(0);
        for (int c = 0; c < magic.length; c = in.read(magic, c,
   magic.length-c)) {}
        in.seek(0);

        if (Arrays.equals(MAGIC, magic))              // current format
          return new DataFileReader<D>(in, reader);
        if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2 format
          return new DataFileReader12<D>(in, reader);

    >>>    throw new IOException("Not an Avro data file"); <<<
      }


Some background...

I am trying to write my first AVRO Hadoop application. I am using Hadoop Cloudera 20.2-737 and AVRO 1.5.1

I followed the instructions here...

http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.html#package_description


The sample code here...

http://svn.apache.org/viewvc/avro/tags/release-1.5.1/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWordCount.java?view=markup

Here is my code which breaks with a "Not an Avro data file" error.


public static class MapImpl extends AvroMapper<Account, Pair<Utf8, Long>> {
        @Override
public void map(Account account, AvroCollector<Pair<Utf8, Long>> collector,
                        Reporter reporter) throws IOException {
StringTokenizer tokens = new StringTokenizer(account.timestamp.toString());
            while (tokens.hasMoreTokens())
collector.collect(new Pair<Utf8, Long>(new Utf8(tokens.nextToken()), 1L));
        }
    }

    public static class ReduceImpl
            extends AvroReducer<Utf8, Long, Pair<Utf8, Long>> {
        @Override
        public void reduce(Utf8 word, Iterable<Long> counts,
                           AvroCollector<Pair<Utf8, Long>> collector,
                           Reporter reporter) throws IOException {
            long sum = 0;
            for (long count : counts)
                sum += count;
            collector.collect(new Pair<Utf8, Long>(word, sum));
        }
    }

    public int run(String[] args) throws Exception {

        if (args.length != 2) {
System.err.println("Usage: " + getClass().getName() + " <input> <output>");
            System.exit(2);
        }

        JobConf job = new JobConf(this.getClass());
        Path outputPath = new Path(args[1]);

        outputPath.getFileSystem(job).delete(outputPath);
        //WordCountUtil.writeLinesFile();

        job.setJobName(this.getClass().getName());

AvroJob.setInputSchema(job, Account.schema); //Schema.create(Schema.Type.STRING));
        AvroJob.setOutputSchema(job,
                new Pair<Utf8, Long>(new Utf8(""), 0L).getSchema());

        AvroJob.setMapperClass(job, MapImpl.class);
        AvroJob.setCombinerClass(job, ReduceImpl.class);
        AvroJob.setReducerClass(job, ReduceImpl.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);
        //FileOutputFormat.setCompressOutput(job, true);

        //WordCountUtil.setMeta(job);

        JobClient.runJob(job);

        //WordCountUtil.validateCountsFile();

        return 0;
    }



Reply via email to