Author: aching Date: Thu Dec 15 23:06:12 2011 New Revision: 1214983 URL: http://svn.apache.org/viewvc?rev=1214983&view=rev Log: GIRAPH-57: Add new RPC call (putVertexIdMessagesList) to batch putMsgList RPCs together. (aching)
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/CODE_CONVENTIONS incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1214983&r1=1214982&r2=1214983&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Thu Dec 15 23:06:12 2011 @@ -2,6 +2,9 @@ Giraph Change Log Release 0.70.0 - unreleased + GIRAPH-57: Add new RPC call (putVertexIdMessagesList) to batch + putMsgList RPCs together. (aching) + GIRAPH-104: Save half of maximum memory used from messaging. (aching) GIRAPH-10: Aggregators are not exported. (claudio) Modified: incubator/giraph/trunk/CODE_CONVENTIONS URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CODE_CONVENTIONS?rev=1214983&r1=1214982&r2=1214983&view=diff ============================================================================== --- incubator/giraph/trunk/CODE_CONVENTIONS (original) +++ incubator/giraph/trunk/CODE_CONVENTIONS Thu Dec 15 23:06:12 2011 @@ -49,9 +49,9 @@ if (LOG.isInfoEnabled()) { } - All classes, members, and member methods should have Javadoc in the following - style. C-style comments for javadoc and // comments for non-javadoc. Also, the comment - block should have a line break that separates the comment section and the @ section. - See below. + style. C-style comments for javadoc and // comments for non-javadoc. Also, + the comment block should have a line break that separates the comment + section and the @ section. See below. /** * This is an example class @@ -78,10 +78,18 @@ public class Giraffe { } } +- When using synchronized statements, there should not be a space between + 'synchronized' and '('. For example: + +public foo() { + synchronized(bar) { + } +} + - Class members should not begin with 'm_' or '_' - No warnings allowed, but be as specific as possible with warning suppression - Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message') -- Static variable names should be entirely capitalized and seperated by '_' +- Static variable names should be entirely capitalized and seperated by '_' (i.e. private static int FOO_BAR_BAR = 2) - Non-static variable and method names should not begin capitalized and should only use alphanumeric characters (i.e. int fooBarBar) Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1214983&r1=1214982&r2=1214983&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java Thu Dec 15 23:06:12 2011 @@ -47,7 +47,7 @@ public abstract class ArrayListWritable< */ public ArrayListWritable() { } - + public ArrayListWritable(ArrayListWritable<M> arrayListWritable) { super(arrayListWritable); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1214983&r1=1214982&r2=1214983&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Dec 15 23:06:12 2011 @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -92,6 +93,8 @@ public abstract class BasicRPCCommunicat private final InetSocketAddress myAddress; /** Messages sent during the last superstep */ private long totalMsgsSentInSuperstep = 0; + /** Maximum messages sent per putVertexIdMessagesList RPC */ + private final int maxMessagesPerFlushPut; /** * Map of the peer connections, mapping from remote socket address to client * meta data @@ -215,48 +218,68 @@ public abstract class BasicRPCCommunicat = peerConnection.getRPCProxy(); long startMillis = System.currentTimeMillis(); long lastReportedMillis = startMillis; - try { int verticesDone = 0; - synchronized (peerConnection.outMessagesPerPeer) { + synchronized(peerConnection.outMessagesPerPeer) { final int vertices = peerConnection.outMessagesPerPeer.size(); + // 1. Check for null messages and combine if possible + // 2. Send vertex ids and messages in bulk to the + // destination servers. + for (Entry<I, MsgList<M>> entry : + peerConnection.outMessagesPerPeer.entrySet()) { + for (M msg : entry.getValue()) { + if (msg == null) { + throw new IllegalArgumentException( + "run: Cannot put null message on " + + "vertex id " + entry.getKey()); + } + } + if (combiner != null && entry.getValue().size() > 1) { + M combinedMsg = combiner.combine(entry.getKey(), + entry.getValue()); + entry.getValue().clear(); + entry.getValue().add(combinedMsg); + } + if (entry.getValue().isEmpty()) { + throw new IllegalStateException( + "run: Impossible for no messages in " + + entry.getKey()); + } + } while (!peerConnection.outMessagesPerPeer.isEmpty()) { - Entry<I, MsgList<M>> e = - peerConnection.outMessagesPerPeer.entrySet().iterator().next(); - MsgList<M> msgList = e.getValue(); - - if (msgList.size() > 0) { - if (msgList.size() > 1) { - if (combiner != null) { - M combinedMsg = combiner.combine(e.getKey(), - msgList); - if (combinedMsg != null) { - proxy.putMsg(e.getKey(), combinedMsg); - } - } else { - proxy.putMsgList(e.getKey(), msgList); - } - } else { - for (M msg : msgList) { - if (msg == null) { - throw new IllegalArgumentException( - "putAllMessages: Cannot put " + - "null message on " + e.getKey()); - } - proxy.putMsg(e.getKey(), msg); - context.progress(); - } + int bulkedMessages = 0; + Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt = + peerConnection.outMessagesPerPeer.entrySet(). + iterator(); + VertexIdMessagesList<I, M> vertexIdMessagesList = + new VertexIdMessagesList<I, M>(); + while (vertexIdMessagesListIt.hasNext()) { + Entry<I, MsgList<M>> entry = + vertexIdMessagesListIt.next(); + // Add this entry if the list is empty or we + // haven't reached the maximum number of messages + if (vertexIdMessagesList.isEmpty() || + ((bulkedMessages + entry.getValue().size()) + < maxMessagesPerFlushPut)) { + vertexIdMessagesList.add( + new VertexIdMessages<I, M>( + entry.getKey(), entry.getValue())); + bulkedMessages += entry.getValue().size(); } - msgList.clear(); } - // Clean up the memory with the message list - msgList = null; - peerConnection.outMessagesPerPeer.remove(e.getKey()); - e = null; + // Clean up references to the vertex id and messages + for (VertexIdMessages<I, M>vertexIdMessages : + vertexIdMessagesList) { + peerConnection.outMessagesPerPeer.remove( + vertexIdMessages.getVertexId()); + } + + proxy.putVertexIdMessagesList(vertexIdMessagesList); + context.progress(); - ++verticesDone; + verticesDone += vertexIdMessagesList.size(); long curMillis = System.currentTimeMillis(); if ((lastReportedMillis + REPORTING_INTERVAL_MIN_MILLIS) < curMillis) { @@ -304,16 +327,18 @@ public abstract class BasicRPCCommunicat * exceeds <i>maxSize</i>. */ private class LargeMessageFlushExecutor implements Runnable { - final I destVertex; - final MsgList<M> outMessage; - PeerConnection peerConnection; + private final I destVertex; + private final MsgList<M> outMessageList; + private PeerConnection peerConnection; LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) { this.peerConnection = peerConnection; - synchronized (peerConnection.outMessagesPerPeer) { + synchronized(peerConnection.outMessagesPerPeer) { this.destVertex = destVertex; - outMessage = peerConnection.outMessagesPerPeer.get(destVertex); - peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>()); + outMessageList = + peerConnection.outMessagesPerPeer.get(destVertex); + peerConnection.outMessagesPerPeer.put(destVertex, + new MsgList<M>()); } } @@ -325,21 +350,21 @@ public abstract class BasicRPCCommunicat if (combiner != null) { M combinedMsg = combiner.combine(destVertex, - outMessage); + outMessageList); if (combinedMsg != null) { proxy.putMsg(destVertex, combinedMsg); } } else { - proxy.putMsgList(destVertex, outMessage); + proxy.putMsgList(destVertex, outMessageList); } } catch (IOException e) { LOG.error(e); if (peerConnection.isProxy) { RPC.stopProxy(peerConnection.peer); } - throw new RuntimeException(e); + throw new RuntimeException("run: Got IOException", e); } finally { - outMessage.clear(); + outMessageList.clear(); } } } @@ -371,6 +396,9 @@ public abstract class BasicRPCCommunicat this.conf = context.getConfiguration(); this.maxSize = conf.getInt(GiraphJob.MSG_SIZE, GiraphJob.MSG_SIZE_DEFAULT); + this.maxMessagesPerFlushPut = + conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT, + GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT); if (BspUtils.getVertexCombinerClass(conf) == null) { this.combiner = null; } else { @@ -492,7 +520,7 @@ public abstract class BasicRPCCommunicat Map<I, MsgList<M>> outMsgMap = null; boolean isProxy = true; CommunicationsInterface<I, V, E, M> peer = this; - synchronized (outMessages) { + synchronized(outMessages) { outMsgMap = outMessages.get(addrUnresolved); if (LOG.isDebugEnabled()) { LOG.debug("startPeerConnectionThread: Connecting to " + @@ -570,7 +598,7 @@ end[HADOOP_FACEBOOK]*/ transientInMessages.put(vertex, msgs); } } - synchronized (msgs) { + synchronized(msgs) { msgs.add(msg); } } @@ -591,12 +619,39 @@ end[HADOOP_FACEBOOK]*/ transientInMessages.put(vertex, msgs); } } - synchronized (msgs) { + synchronized(msgs) { msgs.addAll(msgList); } } @Override + public final void putVertexIdMessagesList( + VertexIdMessagesList<I, M> vertexIdMessagesList) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("putVertexIdMessagesList: Adding msgList " + + vertexIdMessagesList); + } + + List<M> messageList = null; + for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) { + synchronized(transientInMessages) { + messageList = + transientInMessages.get(vertexIdMessages.getVertexId()); + if (messageList == null) { + messageList = new ArrayList<M>( + vertexIdMessages.getMessageList().size()); + transientInMessages.put( + vertexIdMessages.getVertexId(), messageList); + } + } + synchronized(messageList) { + messageList.addAll(vertexIdMessages.getMessageList()); + } + } + } + + @Override public final void putVertexList(int partitionId, VertexList<I, V, E, M> vertexList) throws IOException { @@ -604,7 +659,7 @@ end[HADOOP_FACEBOOK]*/ LOG.debug("putVertexList: On partition id " + partitionId + " adding vertex list of size " + vertexList.size()); } - synchronized (inPartitionVertexMap) { + synchronized(inPartitionVertexMap) { if (vertexList.size() == 0) { return; } @@ -624,7 +679,7 @@ end[HADOOP_FACEBOOK]*/ if (LOG.isDebugEnabled()) { LOG.debug("addEdge: Adding edge " + edge); } - synchronized (inVertexMutationsMap) { + synchronized(inVertexMutationsMap) { VertexMutations<I, V, E, M> vertexMutations = null; if (!inVertexMutationsMap.containsKey(vertexIndex)) { vertexMutations = new VertexMutations<I, V, E, M>(); @@ -642,7 +697,7 @@ end[HADOOP_FACEBOOK]*/ LOG.debug("removeEdge: Removing edge on destination " + destinationVertexIndex); } - synchronized (inVertexMutationsMap) { + synchronized(inVertexMutationsMap) { VertexMutations<I, V, E, M> vertexMutations = null; if (!inVertexMutationsMap.containsKey(vertexIndex)) { vertexMutations = new VertexMutations<I, V, E, M>(); @@ -659,7 +714,7 @@ end[HADOOP_FACEBOOK]*/ if (LOG.isDebugEnabled()) { LOG.debug("addVertex: Adding vertex " + vertex); } - synchronized (inVertexMutationsMap) { + synchronized(inVertexMutationsMap) { VertexMutations<I, V, E, M> vertexMutations = null; if (!inVertexMutationsMap.containsKey(vertex.getVertexId())) { vertexMutations = new VertexMutations<I, V, E, M>(); @@ -676,7 +731,7 @@ end[HADOOP_FACEBOOK]*/ if (LOG.isDebugEnabled()) { LOG.debug("removeVertex: Removing vertex " + vertexIndex); } - synchronized (inVertexMutationsMap) { + synchronized(inVertexMutationsMap) { VertexMutations<I, V, E, M> vertexMutations = null; if (!inVertexMutationsMap.containsKey(vertexIndex)) { vertexMutations = new VertexMutations<I, V, E, M>(); @@ -779,7 +834,7 @@ end[HADOOP_FACEBOOK]*/ } ++totalMsgsSentInSuperstep; Map<I, MsgList<M>> msgMap = null; - synchronized (outMessages) { + synchronized(outMessages) { msgMap = outMessages.get(addr); } if (msgMap == null) { // should never happen after constructor @@ -995,7 +1050,7 @@ end[HADOOP_FACEBOOK]*/ resolveVertexIndexSet.add(entry.getKey()); } } - synchronized (inVertexMutationsMap) { + synchronized(inVertexMutationsMap) { for (I vertexIndex : inVertexMutationsMap.keySet()) { resolveVertexIndexSet.add(vertexIndex); } @@ -1042,7 +1097,7 @@ end[HADOOP_FACEBOOK]*/ partition.removeVertex(originalVertex.getVertexId()); } } - synchronized (inVertexMutationsMap) { + synchronized(inVertexMutationsMap) { inVertexMutationsMap.clear(); } } @@ -1051,7 +1106,7 @@ end[HADOOP_FACEBOOK]*/ public void fixPartitionIdToSocketAddrMap() { // 1. Fix all the cached inet addresses (remove all changed entries) // 2. Connect to any new RPC servers - synchronized (partitionIndexAddressMap) { + synchronized(partitionIndexAddressMap) { for (PartitionOwner partitionOwner : service.getPartitionOwners()) { InetSocketAddress address = partitionIndexAddressMap.get( Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1214983&r1=1214982&r2=1214983&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Thu Dec 15 23:06:12 2011 @@ -77,6 +77,16 @@ public interface CommunicationsInterface void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException; /** + * Adds a list of vertex ids and their respective message lists. + * + * @param vertexIndex Vertex index where the message are added + * @param msgList messages added + * @throws IOException + */ + void putVertexIdMessagesList( + VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException; + + /** * Adds vertex list (index, value, edges, etc.) to the appropriate worker. * * @param partitionId Partition id of the vertices to be added. Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java?rev=1214983&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java Thu Dec 15 23:06:12 2011 @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.graph.BspUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * This object is only used for transporting list of vertices and their + * respective messages to a destination RPC server. + * + * @param <I extends Writable> vertex id + * @param <M extends Writable> message data + */ +@SuppressWarnings("rawtypes") +public class VertexIdMessages<I extends WritableComparable, M extends Writable> + implements Writable, Configurable { + /** Vertex id */ + private I vertexId; + /** Message list corresponding to vertex id */ + private MsgList<M> msgList; + /** Configuration from Configurable */ + private Configuration conf; + + /** + * Reflective constructor. + */ + public VertexIdMessages() {} + + /** + * Constructor used with creating initial values. + * + * @param vertexId Vertex id to be sent + * @param msgList Mesage list for the vertex id to be sent + */ + public VertexIdMessages(I vertexId, MsgList<M> msgList) { + this.vertexId = vertexId; + this.msgList = msgList; + } + + @Override + public void readFields(DataInput input) throws IOException { + vertexId = BspUtils.createVertexIndex(getConf()); + vertexId.readFields(input); + msgList = new MsgList<M>(); + msgList.setConf(getConf()); + msgList.readFields(input); + } + + @Override + public void write(DataOutput output) throws IOException { + vertexId.write(output); + msgList.write(output); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public I getVertexId() { + return vertexId; + } + + public MsgList<M> getMessageList() { + return msgList; + } + } Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java?rev=1214983&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java Thu Dec 15 23:06:12 2011 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Wrapper around {@link ArrayListWritable} that provides the list for + * {@link VertexIdMessage}. + * + * @param <I extends Writable> vertex id + * @param <M extends Writable> message data + */ +@SuppressWarnings("rawtypes") +public class VertexIdMessagesList<I extends WritableComparable, + M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> { + /** Defining a layout version for a serializable class. */ + private static final long serialVersionUID = 100L; + + public VertexIdMessagesList() { + super(); + } + + public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) { + super(vertexIdMessagesList); + } + + @SuppressWarnings("unchecked") + @Override + public void setClass() { + setClass((Class<VertexIdMessages<I, M>>) + (new VertexIdMessages<I, M>()).getClass()); + } +} Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1214983&r1=1214982&r2=1214983&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Dec 15 23:06:12 2011 @@ -61,10 +61,10 @@ public class GiraphJob extends Job { /** Message value class */ public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass"; /** Worker context class */ - public static final String WORKER_CONTEXT_CLASS = + public static final String WORKER_CONTEXT_CLASS = "giraph.workerContextClass"; /** AggregatorWriter class - optional */ - public static final String AGGREGATOR_WRITER_CLASS = + public static final String AGGREGATOR_WRITER_CLASS = "giraph.aggregatorWriterClass"; /** @@ -170,8 +170,15 @@ public class GiraphJob extends Job { /** Default maximum number of messages per peer before flush */ public static final int MSG_SIZE_DEFAULT = 1000; + /** Maximum number of messages that can be bulk sent during a flush */ + public static final String MAX_MESSAGES_PER_FLUSH_PUT = + "giraph.maxMessagesPerFlushPut"; + /** Default number of messages that can be bulk sent during a flush */ + public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000; + /** Number of flush threads per peer */ - public static final String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads"; + public static final String MSG_NUM_FLUSH_THREADS = + "giraph.msgNumFlushThreads"; /** Number of poll attempts prior to failing the job (int) */ public static final String POLL_ATTEMPTS = "giraph.pollAttempts"; @@ -451,7 +458,7 @@ public class GiraphJob extends Job { workerContextClass, WorkerContext.class); } - + /** * Set the aggregator writer class (optional) * @@ -464,7 +471,7 @@ public class GiraphJob extends Job { aggregatorWriterClass, AggregatorWriter.class); } - + /** * Set worker configuration for determining what is required for * a superstep.