import java.io.Serializable;

import org.apache.avro.ipc.specific.Person;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.avro.AvroFileTarget;
import org.apache.crunch.test.CrunchTestSupport;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class EmitNullAvroTest extends CrunchTestSupport implements Serializable {
    @Test
    public void testOutputFilePerKey() throws Exception {
        final Pipeline p = new MRPipeline(EmitNullAvroTest.class, tempDir.getDefaultConfiguration());
        final Path outDir = tempDir.getPath("out");
        final PCollection<String> input = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")));

        final PTable<String, Person> pTableOfPerson = input.parallelDo(new MapFn<String, Pair<String, Person>>() {
            @Override
            public Pair<String, Person> map(final String input) {
                return new Pair<String, Person>("first name", null);
            }
        }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))).write(new AvroFileTarget(outDir), Target.WriteMode.APPEND);

        p.done();
    }
}