[ https://issues.apache.org/jira/browse/AVRO-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345988#comment-16345988 ]
Stan Rosenberg edited comment on AVRO-2138 at 2/2/18 9:03 PM: -------------------------------------------------------------- {code:java} import com.google.common.base.Preconditions; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.ReflectionUtils; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; /** * Extending avro's implementation in order to fix un-necessary copying which happens on _every_ invocation of * write(key, value, keySchema, valSchema, baseOutputPath) * * see https://issues.apache.org/jira/browse/AVRO-2138 */ public class PatchedAvroMultipleOutputs extends org.apache.avro.mapreduce.AvroMultipleOutputs { private Map<String, RecordWriter<?, ?>> recordWriters = new HashMap<>(); private TaskAttemptContext taskContext; public PatchedAvroMultipleOutputs(TaskInputOutputContext<?, ?, ?, ?> context) { super(context); try { taskContext = createTaskAttemptContext(new Job(context.getConfiguration()).getConfiguration(), context.getTaskAttemptID()); } catch (IOException ex) { throw new IllegalStateException("Unable to instantiate PatchedAvroMultipleOutputs", ex); } } @Override public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException { Preconditions.checkArgument(!baseOutputPath.equals("part"), "output name cannot be 'part'"); getRecordWriter(taskContext, baseOutputPath).write(key, value); } private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName) throws IOException, InterruptedException { // look for record-writer in the cache RecordWriter writer = recordWriters.get(baseFileName); // If not in cache, create a new one if (writer == null) { // get the record writer from context output format //FileOutputFormat.setOutputName(taskContext, baseFileName); taskContext.getConfiguration().set("avro.mo.config.namedOutput", baseFileName); try { writer = ((OutputFormat) ReflectionUtils.newInstance( taskContext.getOutputFormatClass(), taskContext.getConfiguration())) .getRecordWriter(taskContext); } catch (ClassNotFoundException e) { throw new IOException(e); } // add the record-writer to the cache recordWriters.put(baseFileName, writer); } return writer; } private TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) { // Use reflection since the context types changed incompatibly between 1.0 // and 2.0. try { Class<?> c = getTaskAttemptContextClass(); Constructor<?> cons = c.getConstructor(Configuration.class, TaskAttemptID.class); return (TaskAttemptContext) cons.newInstance(conf, taskId); } catch (Exception e) { throw new IllegalStateException(e); } } private Class<?> getTaskAttemptContextClass() { try { return Class.forName( "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); } catch (Exception e) { try { return Class.forName( "org.apache.hadoop.mapreduce.TaskAttemptContext"); } catch (Exception ex) { throw new IllegalStateException(ex); } } } @Override public void close() throws IOException, InterruptedException { for (RecordWriter writer : recordWriters.values()) { writer.close(taskContext); } super.close(); } } {code} was (Author: srosenberg): {code:java} import com.google.common.base.Preconditions; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.ReflectionUtils; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; /** * Extending avro's implementation in order to fix un-necessary copying which happens on _every_ invocation of * write(key, value, keySchema, valSchema, baseOutputPath) * * see https://issues.apache.org/jira/browse/AVRO-2138 */ public class PatchedAvroMultipleOutputs extends org.apache.avro.mapreduce.AvroMultipleOutputs { private Map<String, RecordWriter<?, ?>> recordWriters = new HashMap<>(); private TaskAttemptContext taskContext; public PatchedAvroMultipleOutputs(TaskInputOutputContext<?, ?, ?, ?> context) { super(context); try { taskContext = createTaskAttemptContext(new Job(context.getConfiguration()).getConfiguration(), context.getTaskAttemptID()); } catch (IOException ex) { throw new IllegalStateException("Unable to instantiate PatchedAvroMultipleOutputs", ex); } } @Override public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException { Preconditions.checkArgument(!baseOutputPath.equals("part"), "output name cannot be 'part'"); getRecordWriter(taskContext, baseOutputPath).write(key, value); } private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName) throws IOException, InterruptedException { // look for record-writer in the cache RecordWriter writer = recordWriters.get(baseFileName); // If not in cache, create a new one if (writer == null) { // get the record writer from context output format //FileOutputFormat.setOutputName(taskContext, baseFileName); taskContext.getConfiguration().set("avro.mo.config.namedOutput", baseFileName); try { writer = ((OutputFormat) ReflectionUtils.newInstance( taskContext.getOutputFormatClass(), taskContext.getConfiguration())) .getRecordWriter(taskContext); } catch (ClassNotFoundException e) { throw new IOException(e); } // add the record-writer to the cache recordWriters.put(baseFileName, writer); } return writer; } private TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) { // Use reflection since the context types changed incompatibly between 1.0 // and 2.0. try { Class<?> c = getTaskAttemptContextClass(); Constructor<?> cons = c.getConstructor(Configuration.class, TaskAttemptID.class); return (TaskAttemptContext) cons.newInstance(conf, taskId); } catch (Exception e) { throw new IllegalStateException(e); } } private Class<?> getTaskAttemptContextClass() { try { return Class.forName( "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); } catch (Exception e) { try { return Class.forName( "org.apache.hadoop.mapreduce.TaskAttemptContext"); } catch (Exception ex) { throw new IllegalStateException(ex); } } } } {code} > org.apache.avro.mapreduce.AvroMultipleOutputs.write copies Configuration on > every invocation of write > ----------------------------------------------------------------------------------------------------- > > Key: AVRO-2138 > URL: https://issues.apache.org/jira/browse/AVRO-2138 > Project: Avro > Issue Type: Bug > Affects Versions: 1.8.2 > Reporter: Stan Rosenberg > Priority: Major > > While profiling a spark job using AvroMultipleOutputs, I noticed that a great > deal of time is wasted by copying (hadoop) Configuration. Indeed this > happens on _every_ invocation of {{write}}: > [https://github.com/apache/avro/blob/master/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java#L437] > After patching, I am seeing a speed-up of 2x and above in running time of the > same job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)