Author: eelco
Date: Wed Dec 14 23:30:06 2011
New Revision: 30899
URL: https://nixos.org/websvn/nix/?rev=30899&sc=1
Log:
* Buffer writes in FdSink. This significantly reduces the number of
system calls / context switches when dumping a NAR and in the worker
protocol.
Modified:
nix/trunk/src/libstore/remote-store.cc
nix/trunk/src/libutil/serialise.cc
nix/trunk/src/libutil/serialise.hh
nix/trunk/src/nix-worker/nix-worker.cc
Modified: nix/trunk/src/libstore/remote-store.cc
==============================================================================
--- nix/trunk/src/libstore/remote-store.cc Wed Dec 14 22:41:10 2011
(r30898)
+++ nix/trunk/src/libstore/remote-store.cc Wed Dec 14 23:30:06 2011
(r30899)
@@ -65,6 +65,7 @@
/* Send the magic greeting, check for the reply. */
try {
writeInt(WORKER_MAGIC_1, to);
+ to.flush();
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
@@ -166,6 +167,7 @@
RemoteStore::~RemoteStore()
{
try {
+ to.flush();
fdSocket.close();
if (child != -1)
child.wait(true);
@@ -488,6 +490,7 @@
void RemoteStore::processStderr(Sink * sink, Source * source)
{
+ to.flush();
unsigned int msg;
while ((msg = readInt(from)) == STDERR_NEXT
|| msg == STDERR_READ || msg == STDERR_WRITE) {
@@ -503,6 +506,7 @@
AutoDeleteArray<unsigned char> d(buf);
(*source)(buf, len);
writeString(string((const char *) buf, len), to);
+ to.flush();
}
else {
string s = readString(from);
Modified: nix/trunk/src/libutil/serialise.cc
==============================================================================
--- nix/trunk/src/libutil/serialise.cc Wed Dec 14 22:41:10 2011 (r30898)
+++ nix/trunk/src/libutil/serialise.cc Wed Dec 14 23:30:06 2011 (r30899)
@@ -9,7 +9,30 @@
void FdSink::operator () (const unsigned char * data, unsigned int len)
{
- writeFull(fd, data, len);
+ if (!buffer) buffer = new unsigned char[bufSize];
+
+ while (len) {
+ /* Optimisation: bypass the buffer if the data exceeds the
+ buffer size and there is no unflushed data. */
+ if (bufPos == 0 && len >= bufSize) {
+ writeFull(fd, data, len);
+ break;
+ }
+ /* Otherwise, copy the bytes to the buffer. Flush the buffer
+ when it's full. */
+ size_t n = bufPos + len > bufSize ? bufSize - bufPos : len;
+ memcpy(buffer + bufPos, data, n);
+ data += n; bufPos += n; len -= n;
+ if (bufPos == bufSize) flush();
+ }
+}
+
+
+void FdSink::flush()
+{
+ if (fd == -1 || bufPos == 0) return;
+ writeFull(fd, buffer, bufPos);
+ bufPos = 0;
}
Modified: nix/trunk/src/libutil/serialise.hh
==============================================================================
--- nix/trunk/src/libutil/serialise.hh Wed Dec 14 22:41:10 2011 (r30898)
+++ nix/trunk/src/libutil/serialise.hh Wed Dec 14 23:30:06 2011 (r30899)
@@ -28,22 +28,29 @@
};
-/* A sink that writes data to a file descriptor. */
+/* A sink that writes data to a file descriptor (using a buffer). */
struct FdSink : Sink
{
int fd;
+ unsigned int bufSize, bufPos;
+ unsigned char * buffer;
- FdSink()
+ FdSink() : fd(-1), bufSize(32 * 1024), bufPos(0), buffer(0) { }
+
+ FdSink(int fd, unsigned int bufSize = 32 * 1024)
+ : fd(fd), bufSize(bufSize), bufPos(0), buffer(0)
{
- fd = -1;
}
-
- FdSink(int fd)
+
+ ~FdSink()
{
- this->fd = fd;
+ flush();
+ if (buffer) delete[] buffer;
}
void operator () (const unsigned char * data, unsigned int len);
+
+ void flush();
};
Modified: nix/trunk/src/nix-worker/nix-worker.cc
==============================================================================
--- nix/trunk/src/nix-worker/nix-worker.cc Wed Dec 14 22:41:10 2011
(r30898)
+++ nix/trunk/src/nix-worker/nix-worker.cc Wed Dec 14 23:30:06 2011
(r30899)
@@ -57,6 +57,7 @@
try {
writeInt(STDERR_NEXT, to);
writeString(string((char *) buf, count), to);
+ to.flush();
} catch (...) {
/* Write failed; that means that the other side is
gone. */
@@ -200,9 +201,7 @@
struct TunnelSink : Sink
{
Sink & to;
- TunnelSink(Sink & to) : to(to)
- {
- }
+ TunnelSink(Sink & to) : to(to) { }
virtual void operator ()
(const unsigned char * data, unsigned int len)
{
@@ -215,9 +214,7 @@
struct TunnelSource : Source
{
Source & from;
- TunnelSource(Source & from) : from(from)
- {
- }
+ TunnelSource(Source & from) : from(from) { }
virtual void operator ()
(unsigned char * data, unsigned int len)
{
@@ -228,6 +225,7 @@
writeInt(STDERR_READ, to);
writeInt(len, to);
+ to.flush();
string s = readString(from);
if (s.size() != len) throw Error("not enough data");
memcpy(data, (const unsigned char *) s.c_str(), len);
@@ -596,8 +594,8 @@
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
writeInt(WORKER_MAGIC_2, to);
-
writeInt(PROTOCOL_VERSION, to);
+ to.flush();
unsigned int clientVersion = readInt(from);
/* Send startup error messages to the client. */
@@ -619,9 +617,11 @@
store = boost::shared_ptr<StoreAPI>(new LocalStore());
stopWork();
+ to.flush();
} catch (Error & e) {
stopWork(false, e.msg());
+ to.flush();
return;
}
@@ -652,6 +652,8 @@
if (!errorAllowed) break;
}
+ to.flush();
+
assert(!canSendStderr);
};
_______________________________________________
nix-commits mailing list
[email protected]
http://lists.science.uu.nl/mailman/listinfo/nix-commits