Author: eelco
Date: Fri Dec 16 19:44:13 2011
New Revision: 30940
URL: https://nixos.org/websvn/nix/?rev=30940&sc=1
Log:
* Make the import operation through the daemon much more efficient
(way fewer roundtrips) by allowing the client to send data in bigger
chunks.
* Some refactoring.
Modified:
nix/trunk/src/libstore/local-store.cc
nix/trunk/src/libstore/remote-store.cc
nix/trunk/src/libstore/worker-protocol.hh
nix/trunk/src/libutil/serialise.cc
nix/trunk/src/libutil/serialise.hh
nix/trunk/src/nix-worker/nix-worker.cc
Modified: nix/trunk/src/libstore/local-store.cc
==============================================================================
--- nix/trunk/src/libstore/local-store.cc Fri Dec 16 19:38:43 2011
(r30939)
+++ nix/trunk/src/libstore/local-store.cc Fri Dec 16 19:44:13 2011
(r30940)
@@ -1199,10 +1199,11 @@
{
hashing = true;
}
- virtual void operator () (unsigned char * data, size_t len)
+ size_t read(unsigned char * data, size_t len)
{
- readSource(data, len);
- if (hashing) hashSink(data, len);
+ size_t n = readSource.read(data, len);
+ if (hashing) hashSink(data, n);
+ return n;
}
};
Modified: nix/trunk/src/libstore/remote-store.cc
==============================================================================
--- nix/trunk/src/libstore/remote-store.cc Fri Dec 16 19:38:43 2011
(r30939)
+++ nix/trunk/src/libstore/remote-store.cc Fri Dec 16 19:44:13 2011
(r30940)
@@ -501,11 +501,11 @@
}
else if (msg == STDERR_READ) {
if (!source) throw Error("no source");
- unsigned int len = readInt(from);
+ size_t len = readInt(from);
unsigned char * buf = new unsigned char[len];
AutoDeleteArray<unsigned char> d(buf);
- (*source)(buf, len);
- writeString(string((const char *) buf, len), to);
+ size_t n = source->read(buf, len);
+ writeString(string((const char *) buf, n), to); // !!! inefficient
to.flush();
}
else {
Modified: nix/trunk/src/libstore/worker-protocol.hh
==============================================================================
--- nix/trunk/src/libstore/worker-protocol.hh Fri Dec 16 19:38:43 2011
(r30939)
+++ nix/trunk/src/libstore/worker-protocol.hh Fri Dec 16 19:44:13 2011
(r30940)
@@ -8,7 +8,7 @@
#define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f
-#define PROTOCOL_VERSION 0x108
+#define PROTOCOL_VERSION 0x109
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
Modified: nix/trunk/src/libutil/serialise.cc
==============================================================================
--- nix/trunk/src/libutil/serialise.cc Fri Dec 16 19:38:43 2011 (r30939)
+++ nix/trunk/src/libutil/serialise.cc Fri Dec 16 19:44:13 2011 (r30940)
@@ -23,8 +23,9 @@
while (len) {
/* Optimisation: bypass the buffer if the data exceeds the
- buffer size and there is no unflushed data. */
- if (bufPos == 0 && len >= bufSize) {
+ buffer size. */
+ if (bufPos + len >= bufSize) {
+ flush();
write(data, len);
break;
}
@@ -59,29 +60,37 @@
}
+void Source::operator () (unsigned char * data, size_t len)
+{
+ while (len) {
+ size_t n = read(data, len);
+ data += n; len -= n;
+ }
+}
+
+
BufferedSource::~BufferedSource()
{
if (buffer) delete[] buffer;
}
-void BufferedSource::operator () (unsigned char * data, size_t len)
+size_t BufferedSource::read(unsigned char * data, size_t len)
{
if (!buffer) buffer = new unsigned char[bufSize];
- while (len) {
- if (!bufPosIn) bufPosIn = read(buffer, bufSize);
+ if (!bufPosIn) bufPosIn = readUnbuffered(buffer, bufSize);
- /* Copy out the data in the buffer. */
- size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len;
- memcpy(data, buffer + bufPosOut, n);
- data += n; bufPosOut += n; len -= n;
- if (bufPosIn == bufPosOut) bufPosIn = bufPosOut = 0;
- }
+ /* Copy out the data in the buffer. */
+ size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len;
+ memcpy(data, buffer + bufPosOut, n);
+ bufPosOut += n;
+ if (bufPosIn == bufPosOut) bufPosIn = bufPosOut = 0;
+ return n;
}
-size_t FdSource::read(unsigned char * data, size_t len)
+size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
{
ssize_t n;
do {
@@ -93,6 +102,15 @@
return n;
}
+
+size_t StringSource::read(unsigned char * data, size_t len)
+{
+ if (pos == s.size()) throw EndOfFile("end of string reached");
+ size_t n = s.copy((char *) data, len, pos);
+ pos += n;
+ return n;
+}
+
void writePadding(size_t len, Sink & sink)
{
Modified: nix/trunk/src/libutil/serialise.hh
==============================================================================
--- nix/trunk/src/libutil/serialise.hh Fri Dec 16 19:38:43 2011 (r30939)
+++ nix/trunk/src/libutil/serialise.hh Fri Dec 16 19:44:13 2011 (r30940)
@@ -24,7 +24,7 @@
BufferedSink(size_t bufSize = 32 * 1024)
: bufSize(bufSize), bufPos(0), buffer(0) { }
~BufferedSink();
-
+
void operator () (const unsigned char * data, size_t len);
void flush();
@@ -39,9 +39,14 @@
virtual ~Source() { }
/* Store exactly ‘len’ bytes in the buffer pointed to by ‘data’.
- It blocks if that much data is not yet available, or throws an
- error if it is not going to be available. */
- virtual void operator () (unsigned char * data, size_t len) = 0;
+ It blocks until all the requested data is available, or throws
+ an error if it is not going to be available. */
+ void operator () (unsigned char * data, size_t len);
+
+ /* Store up to ‘len’ in the buffer pointed to by ‘data’, and
+ return the number of bytes stored. If blocks until at least
+ one byte is available. */
+ virtual size_t read(unsigned char * data, size_t len) = 0;
};
@@ -55,12 +60,10 @@
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~BufferedSource();
- void operator () (unsigned char * data, size_t len);
+ size_t read(unsigned char * data, size_t len);
- /* Store up to ‘len’ in the buffer pointed to by ‘data’, and
- return the number of bytes stored. If should block until at
- least one byte is available. */
- virtual size_t read(unsigned char * data, size_t len) = 0;
+ /* Underlying read call, to be overriden. */
+ virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
};
@@ -83,7 +86,7 @@
int fd;
FdSource() : fd(-1) { }
FdSource(int fd) : fd(fd) { }
- size_t read(unsigned char * data, size_t len);
+ size_t readUnbuffered(unsigned char * data, size_t len);
};
@@ -104,13 +107,7 @@
const string & s;
size_t pos;
StringSource(const string & _s) : s(_s), pos(0) { }
- virtual void operator () (unsigned char * data, size_t len)
- {
- s.copy((char *) data, len, pos);
- pos += len;
- if (pos > s.size())
- throw Error("end of string reached");
- }
+ size_t read(unsigned char * data, size_t len);
};
Modified: nix/trunk/src/nix-worker/nix-worker.cc
==============================================================================
--- nix/trunk/src/nix-worker/nix-worker.cc Fri Dec 16 19:38:43 2011
(r30939)
+++ nix/trunk/src/nix-worker/nix-worker.cc Fri Dec 16 19:44:13 2011
(r30940)
@@ -210,11 +210,11 @@
};
-struct TunnelSource : Source
+struct TunnelSource : BufferedSource
{
Source & from;
TunnelSource(Source & from) : from(from) { }
- virtual void operator () (unsigned char * data, size_t len)
+ size_t readUnbuffered(unsigned char * data, size_t len)
{
/* Careful: we're going to receive data from the client now,
so we have to disable the SIGPOLL handler. */
@@ -224,11 +224,16 @@
writeInt(STDERR_READ, to);
writeInt(len, to);
to.flush();
- string s = readString(from);
- if (s.size() != len) throw Error("not enough data");
- memcpy(data, (const unsigned char *) s.c_str(), len);
+ string s = readString(from); // !!! inefficient
startWork();
+
+ if (s.empty()) throw EndOfFile("unexpected end-of-file");
+ if (s.size() > len) throw Error("client sent too much data");
+
+ memcpy(data, (const unsigned char *) s.c_str(), s.size());
+
+ return s.size();
}
};
@@ -265,10 +270,11 @@
Source & orig;
string s;
SavingSourceAdapter(Source & orig) : orig(orig) { }
- void operator () (unsigned char * data, size_t len)
+ size_t read(unsigned char * data, size_t len)
{
- orig(data, len);
- s.append((const char *) data, len);
+ size_t n = orig.read(data, len);
+ s.append((const char *) data, n);
+ return n;
}
};
@@ -397,6 +403,8 @@
case wopImportPath: {
startWork();
+ if (GET_PROTOCOL_MINOR(clientVersion) < 9)
+ throw Error("import not supported; upgrade your client");
TunnelSource source(from);
Path path = store->importPath(true, source);
stopWork();
_______________________________________________
nix-commits mailing list
[email protected]
http://lists.science.uu.nl/mailman/listinfo/nix-commits