[
https://issues.apache.org/jira/browse/MAPREDUCE-2070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12910647#action_12910647
]
Paul Burkhardt commented on MAPREDUCE-2070:
-------------------------------------------
An example RecordReader that concatenates file pairs from the Cartesian product
is as follows:
{code}
public class CPRecordReader
implements RecordReader<Text, BytesWritable>, CartesianProductTupleReader {
private JobConf job = null;
private long pos = 0;
private long totalLength = 0;
private byte[] keyContent = null;
private byte[] valueContent = null;
private String keyName = null;
private String valueName = null;
private String DELIMITER = null;
private CartesianProductFileSplit split = null;
private CartesianProductFileSplitReader reader = null;
public CPRecordReader(JobConf job, CartesianProductFileSplit split)
throws IOException {
this.job = job;
this.split = split;
this.totalLength = split.getLength();
this.DELIMITER = job.get("mapred.input.format.delimiter");
reader = new CartesianProductFileSplitReader(split, this);
}
@Override
public boolean next(Text key, BytesWritable value) throws IOException {
if (reader.next()) {
setKey(key);
setValue(value);
pos += valueContent.length;
return true;
} else {
return false;
}
}
@Override
public void readKey(Path p) throws IOException {
long length = split.getLength(p);
keyName = p.toString();
keyContent = new byte[(int)length];
FSDataInputStream stream = p.getFileSystem(job).open(p);
stream.readFully(keyContent, 0, (int)length);
}
@Override
public void readValue(Path p) throws IOException {
long length = split.getLength(p);
valueName = p.toString();
valueContent = new byte[(int)(keyContent.length + length)];
System.arraycopy(keyContent, 0, valueContent, 0, keyContent.length);
FSDataInputStream stream = p.getFileSystem(job).open(p);
stream.readFully(0, valueContent, (int)keyContent.length, (int)length);
}
@Override
public void close() throws IOException {
return;
}
@Override
public Text createKey() {
return new Text();
}
@Override
public BytesWritable createValue() {
return new BytesWritable();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
return ((float) getPos()) / totalLength;
}
private void setKey(Text key) throws IOException {
long length = valueContent.length - keyContent.length;
String keystring = new String();
keystring += keyName + ":" + keyContent.length;
keystring += DELIMITER;
keystring += valueName + ":" + length;
key.set(keystring);
}
private void setValue(BytesWritable value) throws IOException {
value.set(valueContent, 0, valueContent.length);
}
}
{code}
> Cartesian product file split
> ----------------------------
>
> Key: MAPREDUCE-2070
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-2070
> Project: Hadoop Map/Reduce
> Issue Type: New Feature
> Affects Versions: 0.22.0
> Reporter: Paul Burkhardt
> Priority: Minor
> Attachments: MAPREDUCE-2070
>
>
> Generates a Cartesian product of file pairs from two directory inputs and
> enables a RecordReader to optimally read the split in tuple order,
> eliminating extraneous read operations.
> The new InputFormat generates a split comprised of file combinations as
> tuples. The size of the split is configurable. A RecordReader employs the
> convenience class, CartesianProductFileSplitReader, to generate file pairs in
> tuple ordering. The actual read operations are delegated to the RecordReader
> which must implement the CartesianProductTupleReader interface. An
> implementor of a RecordReader can perform file manipulations without
> restriction and also benefit from the optimization of tuple ordering.
> In the Cartesian product of two sets with cardinalities, X and Y, each
> element x in {X } need only be referenced once, saving X(Y-1) references of
> the elements. If the Cartesian product is split into subsets of size N there
> are then X(Y/N) instead of XY references for a difference of XY(N-1)/N.
> Suppose each x is equal in size, s, this would save reading sXY(N-1)/N bytes.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.