Hey, I recently implemented a Hadoop InputFormat that returns the raw bytes of each record as a BytesWritable rather than as Text (as in TextInputFormat, which assumes that the input is UTF-8).
One thing I noticed is that Hadoop RecordReader <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html> implementations generally <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178> re-use <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118> the <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214> Writable instance across multiple {getCurrentKey() + getCurrentValue()} calls for efficiency, though this isn't documented. Crunch handles this for Text because Writables.strings() uses this converter: private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text, String>() { @Override public String map(Text input) { return input.toString(); } }; and toString() will create a copy of Text's data. However, here is its corresponding map implementation for Writables.bytes(): private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new MapFn<BytesWritable, ByteBuffer>() { @Override public ByteBuffer map(BytesWritable input) { return ByteBuffer.wrap(input.getBytes(), 0, input.getLength()); } }; since ByteBuffer.wrap() will still reference BytesWritable()'s internal state, and the BytesWritable instance is reused across multiple records, this causes problems in Crunch if the BytesWritable came from a RecordReader. One work-around is to construct a new WritableType that uses a MapFn that creates a copy of the data, and only use it when reading from a Hadoop InputFormat that returns a BytesWritable. Is there a more general way to solve this?
