Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2717
Change subject: Improve performance of NotifyBrokerRuntime code
......................................................................
Improve performance of NotifyBrokerRuntime code
Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281
---
M
asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
1 file changed, 57 insertions(+), 52 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad
refs/changes/17/2717/1
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 6ffb244..97615b1 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -19,34 +19,33 @@
package org.apache.asterix.bad.runtime;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.INcApplicationContext;
+import
org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
+import
org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
import
org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import
org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AUUID;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,15 +57,19 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
public class NotifyBrokerRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
private static final Logger LOGGER =
Logger.getLogger(NotifyBrokerRuntime.class.getName());
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
- private final AOrderedListSerializerDeserializer subSerDes =
- new AOrderedListSerializerDeserializer(new
AOrderedListType(BuiltinType.AUUID, null));
- private final ARecordSerializerDeserializer recordSerDes;
+ private static final AStringSerializerDeserializer stringSerDes =
+ new AStringSerializerDeserializer(new UTF8StringWriter(), new
UTF8StringReader());
+
+ private final IPrinter recordPrinterFactory;
+ private final IPrinter listPrinterFactory;
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
@@ -74,14 +77,14 @@
private IScalarEvaluator eval0;
private IScalarEvaluator eval1;
private IScalarEvaluator eval2;
- private final ActiveManager activeManager;
private final EntityId entityId;
private final boolean push;
- private AOrderedList pushList;
- private ARecord pushRecord;
- private final IAType recordType;
- private final Map<String, HashSet<String>> sendData = new HashMap<>();
+ private final Map<String, String> sendData = new HashMap<>();
+ private final Map<String, ByteArrayOutputStream> sendbaos = new
HashMap<>();
+ private final Map<String, PrintStream> sendStreams = new HashMap<>();
private String executionTimeString;
+ private boolean firstResult = true;
+ String endpoint;
public NotifyBrokerRuntime(IHyracksTaskContext ctx,
IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory pushListEvalFactory,
IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
- this.activeManager = (ActiveManager) ((INcApplicationContext)
ctx.getJobletContext().getServiceContext()
- .getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
this.push = push;
- this.pushList = null;
- this.pushRecord = null;
- this.recordType = recordType;
- recordSerDes = new ARecordSerializerDeserializer((ARecordType)
recordType);
+ recordPrinterFactory = new ARecordPrinterFactory((ARecordType)
recordType).createPrinter();
+ listPrinterFactory =
+ new AOrderedlistPrinterFactory(new
AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
executionTimeString = null;
}
@@ -106,26 +106,16 @@
return;
}
- private void addSubscriptions(String endpoint, AOrderedList
subscriptionIds) {
- for (int i = 0; i < subscriptionIds.size(); i++) {
- AUUID subId = (AUUID) subscriptionIds.getItem(i);
- String subscriptionString = subId.toString();
- //Broker code currently cannot handle the "uuid {}" part of the
string, so we parse just the value
- subscriptionString = subscriptionString.substring(8,
subscriptionString.length() - 2);
- subscriptionString = "\"" + subscriptionString + "\"";
- sendData.get(endpoint).add(subscriptionString);
- }
- }
-
public String createData(String endpoint) {
+ String resultTitle = "\"subscriptionIds";
+ if (push) {
+ resultTitle = "\"results\"";
+ }
String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() +
"\", \"channelName\":\""
+ entityId.getEntityName() + "\", \"" +
BADConstants.ChannelExecutionTime + "\":\""
- + executionTimeString + "\", \"subscriptionIds\":[";
- for (String value : sendData.get(endpoint)) {
- JSON += value;
- JSON += ",";
- }
- JSON = JSON.substring(0, JSON.length() - 1);
+ + executionTimeString + "\", " + resultTitle + ":[";
+ JSON += sendData.get(endpoint);
+ JSON = JSON.substring(0, JSON.length());
JSON += "]}";
return JSON;
@@ -185,34 +175,49 @@
int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(),
serBrokerOffset + 1);
- String endpoint =
AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
- sendData.putIfAbsent(endpoint, new HashSet<>());
+ endpoint = stringSerDes.deserialize(di).getStringValue();
+ sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
+ try {
+ sendStreams.putIfAbsent(endpoint, new
PrintStream(sendbaos.get(endpoint), true, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new HyracksDataException(e.getMessage());
+ }
if (push) {
int pushOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(),
pushOffset + 1);
- //TODO: Right now this creates an object per channel result.
Need to find a better way to deserialize
- pushRecord = recordSerDes.deserialize(di);
- sendData.get(endpoint).add(pushRecord.toString());
+ if (!firstResult) {
+ sendStreams.get(endpoint).append(',');
+ }
+ recordPrinterFactory.print(inputArg1.getByteArray(),
inputArg1.getStartOffset(), inputArg1.getLength(),
+ sendStreams.get(endpoint));
} else {
- int serSubOffset = inputArg1.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(),
serSubOffset + 1);
- pushList = subSerDes.deserialize(di);
- addSubscriptions(endpoint, pushList);
+ if (!firstResult) {
+ sendStreams.get(endpoint).append(',');
+ }
+ listPrinterFactory.print(inputArg1.getByteArray(),
inputArg1.getStartOffset(), inputArg1.getLength(),
+ sendStreams.get(endpoint));
}
+ firstResult = false;
}
}
@Override
public void close() throws HyracksDataException {
- for (String endpoint : sendData.keySet()) {
- if (sendData.get(endpoint).size() > 0) {
- sendGroupOfResults(endpoint);
- sendData.get(endpoint).clear();
+ for (String endpoint : sendStreams.keySet()) {
+ sendData.put(endpoint, new
String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
+ sendGroupOfResults(endpoint);
+ sendStreams.get(endpoint).close();
+ try {
+ sendbaos.get(endpoint).close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
}
+
}
+
return;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2717
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>