Changeset: fa30331b7aca for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fa30331b7aca
Modified Files:
        clients/mapilib/mapi.c
        common/stream/stream.c
        common/stream/stream.h
        common/utils/conversion.c
        sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:

When a single row does not fit into our buffer, enlarge the buffer.


diffs (146 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -5569,7 +5569,7 @@ mapi_fetch_row(MapiHdl hdl)
                // if not, check if our cache is empty
                if (result->rows_read >= result->tuple_count) {
                        // if our cache is empty, we read data from the socket
-                       lng nrows = 0;
+                       lng nrows = -1;
                        result->cur_row = 1;
 
                        // first we write a prompt to the server indicating 
that we want another block of the result set
@@ -5587,12 +5587,27 @@ mapi_fetch_row(MapiHdl hdl)
 
                        // this actually triggers the read of the entire block
                        // after this point we operate on the buffer
-                       if (!mnstr_readLng(hdl->mid->from, &nrows)) {
-                               // FIXME: set hdl->mid to something
-                               hdl->mid->errorstr = strdup("Failed to read row 
response");
-                               hdl->mid->error = 0;
-                               fprintf(stderr, "Failure 3.\n");
-                               return hdl->mid->error;
+                       while(nrows < 0) {
+                               if (!mnstr_readLng(hdl->mid->from, &nrows)) {
+                                       // FIXME: set hdl->mid to something
+                                       hdl->mid->errorstr = strdup("Failed to 
read row response");
+                                       hdl->mid->error = 0;
+                                       fprintf(stderr, "Failure 3.\n");
+                                       return hdl->mid->error;
+                               }
+                               if (nrows < 0) {
+                                       lng new_size;
+                                       char dummy;
+                                       // increase buffer size
+                                       if (!mnstr_readLng(hdl->mid->from, 
&new_size)) {
+                                               return hdl->mid->error;
+                                       }
+                                       // consume flush
+                                       mnstr_readChr(hdl->mid->from, &dummy);
+                                       // resize buffer
+                                       bs2_resizebuf(hdl->mid->from, new_size);
+                                       hdl->mid->blocksize = new_size;
+                               }
                        }
 
                        assert(nrows <= result->row_count);
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4456,6 +4456,36 @@ bs2_stealbuf(stream *ss)
        return buffer;
 }
 
+void 
+bs2_resizebuf(stream *ss, size_t bufsiz) {
+       size_t compress_bound;
+       bs2 *s = (bs2 *) ss->stream_data.p;
+       assert(ss->read == bs2_read);
+
+       if (s->buf) free(s->buf);
+       if (s->compbuf) free(s->compbuf);
+       
+       s->bufsiz = 0;
+       s->buf = NULL;
+       s->compbuf = NULL;
+
+       if ((s->buf = malloc(bufsiz)) == NULL) {
+               return;
+       }
+       s->bufsiz = bufsiz;
+       compress_bound = compression_size_bound(s);
+       if (compress_bound > 0) {
+               s->compbufsiz = compress_bound;
+               s->compbuf = malloc(s->compbufsiz);
+               if (!s->compbuf) {
+                       free(s->buf);
+                       s->buf = NULL;
+                       return;
+               }
+       }
+       bs2_resetbuf(ss);
+}
+
 void
 bs2_resetbuf(stream *ss)
 {
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -256,6 +256,7 @@ typedef enum {
 
 stream_export stream *block_stream2(stream *s, size_t bufsiz, 
compression_method comp, column_compression colcomp);
 stream_export void* bs2_stealbuf(stream *ss);
+stream_export void bs2_resizebuf(stream *ss, size_t bufsiz);
 stream_export void bs2_resetbuf(stream *ss);
 stream_export buffer bs2_buffer(stream *s);
 column_compression bs2_colcomp(stream *ss);
diff --git a/common/utils/conversion.c b/common/utils/conversion.c
--- a/common/utils/conversion.c
+++ b/common/utils/conversion.c
@@ -290,7 +290,8 @@ conversion_time_to_string(char *dst, int
        if (res = sprintf(dst, "%02d:%02d:%02d.%03d000", hour, min, sec, ms) < 
0) {
                return res;
        }
-
+       digits--;
+       if (digits == 0) digits = -1;
        // adjust displayed precision based on the digits
        dst[9 + digits] = '\0';
        return 9 + digits;
diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c
--- a/sql/backends/monet5/sql_result.c
+++ b/sql/backends/monet5/sql_result.c
@@ -2045,9 +2045,10 @@ static int mvc_export_resultset_prot10(m
                                row = row > (size_t) count ? (size_t) count : 
row;
                        }
                } else {
+                       size_t rowsize = 0;
                        // we have varsized elements, so we have to loop to 
determine how many rows fit into a buffer
                        while (row < (size_t) count) {
-                               size_t rowsize = fixed_lengths;
+                               rowsize = fixed_lengths;
                                for (i = 0; i < (size_t) t->nr_cols; i++) {
                                        res_col *c = t->cols + i;
                                        int mtype = iterators[i].b->ttype;
@@ -2063,7 +2064,19 @@ static int mvc_export_resultset_prot10(m
                                bytes_left -= rowsize;
                                row++;
                        }
-                       assert(row > srow);
+                       if (row == srow) {
+                               lng new_size = rowsize + 1024;
+                               if (!mnstr_writeLng(s, (lng) -1) || 
+                                       !mnstr_writeLng(s, new_size) || 
+                                       mnstr_flush(s) < 0) {
+                                       fres = -1;
+                                       goto cleanup;
+                               }
+                               row = srow + 1;
+                               bs2_resizebuf(s, new_size);
+                               buf = bs2_buffer(s).buf;
+                               bsize = new_size;
+                       }
                }
 
                if (row <= srow) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to