That is correct, yes. On Wed, Jul 6, 2016 at 10:07 AM, Everett Anderson <[email protected]> wrote:
> > > On Tue, Jul 5, 2016 at 10:06 PM, Josh Wills <[email protected]> wrote: > >> That _may_ cause problems in Crunch if the DoFns that are processing >> those ByteBuffers don't convert them to an immutable data type, or if they >> need to cache some of those values along the way during processing (if >> there isn't any caching in the flow, it's not normally an issue to process >> the records one-at-a-time w/no deep copy necessary.) The best practice in >> Crunch for those situations (i.e., non-immutable data + some sort of >> caching/maintenance of state) is to use the PType.getDetachedValue(obj) >> function to do a deep copy of the value in a way that is independent of the >> underlying data type (where you can be clever for PTypes of immutable >> objects and just return the value itself.) >> > > If I understand correctly, for non-immutable types, the recommendation is > to be aware and add either a DoFn that copies the data right after read() > or use a PType that does this. Is that right? > > For example (though if you were using Text, you should probably just use > Writables.strings() to avoid this), this won't work: > > TableSource<LongWritable, Text> source = > From.formattedFile("/path/to/textfile", > TextInputFormat.class, > Writables.writables(LongWritable.class), > Writables.writables(Text.class)); > > PTable<LongWritable, Text> data = getPipeline().read(source); > > for (Pair<LongWritable, Text> pair : data.materialize()) { > System.out.println("offset: " + pair.first() + > ", text: " + pair.second().toString()); > } > > but you could add something like this right after read() to force a copy > (which will work because Crunch would do this before a serialization > boundary) -- > > final PType<Pair<LongWritable, Text>> pType = data.getPType(); > pType.initialize(getPipeline().getConfiguration()); > > data = data.parallelDo( > new DoFn<Pair<LongWritable, Text>, Pair<LongWritable, Text>>() { > public void process(Pair<LongWritable, Text> input, > Emitter<Pair<LongWritable, Text>> emitter) { > emitter.emit(pType.getDetachedValue(input)); > } > }, data.getPTableType()); > > or you could create your own PTypes to give to From.formattedFile that > make copies. > > >> On Tue, Jul 5, 2016 at 3:39 PM, Everett Anderson <[email protected]> >> wrote: >> >>> Hey, >>> >>> I recently implemented a Hadoop InputFormat that returns the raw bytes >>> of each record as a BytesWritable rather than as Text (as in >>> TextInputFormat, which assumes that the input is UTF-8). >>> >>> One thing I noticed is that Hadoop RecordReader >>> <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html> >>> implementations generally >>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178> >>> re-use >>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118> >>> the >>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214> >>> Writable instance across multiple {getCurrentKey() + getCurrentValue()} >>> calls for efficiency, though this isn't documented. >>> >>> Crunch handles this for Text because Writables.strings() uses this >>> converter: >>> >>> private static final MapFn<Text, String> TEXT_TO_STRING = new >>> MapFn<Text, String>() { >>> @Override >>> public String map(Text input) { >>> return input.toString(); >>> } >>> }; >>> >>> and toString() will create a copy of Text's data. >>> >>> However, here is its corresponding map implementation for >>> Writables.bytes(): >>> >>> private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new >>> MapFn<BytesWritable, ByteBuffer>() { >>> @Override >>> public ByteBuffer map(BytesWritable input) { >>> return ByteBuffer.wrap(input.getBytes(), 0, input.getLength()); >>> } >>> }; >>> >>> since ByteBuffer.wrap() will still reference BytesWritable()'s internal >>> state, and the BytesWritable instance is reused across multiple records, >>> this causes problems in Crunch if the BytesWritable came from a >>> RecordReader. >>> >>> One work-around is to construct a new WritableType that uses a MapFn >>> that creates a copy of the data, and only use it when reading from a Hadoop >>> InputFormat that returns a BytesWritable. >>> >>> Is there a more general way to solve this? >>> >> >> >
