Hi friends,As I promised earlier, here is my take at doing the same fix to sftp_write as we've worked on for sftp_read.
I'll appreciate eyeballs and testing! -- / daniel.haxx.se
From e07342443f07e050e38960ce698ea66cc32d452a Mon Sep 17 00:00:00 2001 From: Daniel Stenberg <dan...@haxx.se> Date: Tue, 7 Feb 2012 00:35:51 +0100 Subject: [PATCH] sftp_write: cannot return acked data *and* EAGAIN Whenever we have acked data and is about to call a function that *MAY* return EAGAIN we must return the number now and wait to get called again. Our API only allows data *or* EAGAIN and we must never try to get both. --- src/sftp.c | 259 +++++++++++++++++++++++++++++++----------------------------- src/sftp.h | 7 +- 2 files changed, 139 insertions(+), 127 deletions(-) diff --git a/src/sftp.c b/src/sftp.c index 7af5c43..c0c8c52 100644 --- a/src/sftp.c +++ b/src/sftp.c @@ -1643,157 +1643,168 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer, struct sftp_pipeline_chunk *next; size_t acked = 0; size_t org_count = count; - size_t eagain = 0; + size_t already; - /* Number of bytes sent off that haven't been acked and therefor we will - get passed in here again. + switch(sftp->write_state) { + default: + case libssh2_NB_state_idle: - Also, add up the number of bytes that actually already have been acked - but we haven't been able to return as such yet, so we will get that - data as well passed in here again. - */ - size_t already = (handle->u.file.offset_sent - handle->u.file.offset)+ - handle->u.file.acked; + /* Number of bytes sent off that haven't been acked and therefor we + will get passed in here again. + + Also, add up the number of bytes that actually already have been + acked but we haven't been able to return as such yet, so we will + get that data as well passed in here again. + */ + already = (handle->u.file.offset_sent - handle->u.file.offset)+ + handle->u.file.acked; + + if(count >= already) { + /* skip the part already made into packets */ + buffer += already; + count -= already; + } + else + /* there is more data already fine than what we got in this call */ + count = 0; + + sftp->write_state = libssh2_NB_state_idle; + while(count) { + /* TODO: Possibly this should have some logic to prevent a very + very small fraction to be left but lets ignore that for now */ + uint32_t size = MIN(MAX_SFTP_OUTGOING_SIZE, count); + uint32_t request_id; - if(count >= already) { - /* skip the part already made into packets */ - buffer += already; - count -= already; - } - else - /* there is more data already fine than what we got in this call */ - count = 0; - - while(count) { - /* TODO: Possibly this should have some logic to prevent a very very - small fraction to be left but lets ignore that for now */ - uint32_t size = MIN(MAX_SFTP_OUTGOING_SIZE, count); - uint32_t request_id; - - /* 25 = packet_len(4) + packet_type(1) + request_id(4) + - handle_len(4) + offset(8) + count(4) */ - packet_len = handle->handle_len + size + 25; - - chunk = LIBSSH2_ALLOC(session, packet_len + - sizeof(struct sftp_pipeline_chunk)); - if (!chunk) - return _libssh2_error(session, LIBSSH2_ERROR_ALLOC, - "malloc fail for FXP_WRITE"); + /* 25 = packet_len(4) + packet_type(1) + request_id(4) + + handle_len(4) + offset(8) + count(4) */ + packet_len = handle->handle_len + size + 25; + + chunk = LIBSSH2_ALLOC(session, packet_len + + sizeof(struct sftp_pipeline_chunk)); + if (!chunk) + return _libssh2_error(session, LIBSSH2_ERROR_ALLOC, + "malloc fail for FXP_WRITE"); - chunk->len = size; - chunk->sent = 0; - chunk->lefttosend = packet_len; + chunk->len = size; + chunk->sent = 0; + chunk->lefttosend = packet_len; - s = chunk->packet; - _libssh2_store_u32(&s, packet_len - 4); + s = chunk->packet; + _libssh2_store_u32(&s, packet_len - 4); - *(s++) = SSH_FXP_WRITE; - request_id = sftp->request_id++; - chunk->request_id = request_id; - _libssh2_store_u32(&s, request_id); - _libssh2_store_str(&s, handle->handle, handle->handle_len); - _libssh2_store_u64(&s, handle->u.file.offset_sent); - handle->u.file.offset_sent += size; /* advance offset at once */ - _libssh2_store_str(&s, buffer, size); + *(s++) = SSH_FXP_WRITE; + request_id = sftp->request_id++; + chunk->request_id = request_id; + _libssh2_store_u32(&s, request_id); + _libssh2_store_str(&s, handle->handle, handle->handle_len); + _libssh2_store_u64(&s, handle->u.file.offset_sent); + handle->u.file.offset_sent += size; /* advance offset at once */ + _libssh2_store_str(&s, buffer, size); - /* add this new entry LAST in the list */ - _libssh2_list_add(&handle->packet_list, &chunk->node); + /* add this new entry LAST in the list */ + _libssh2_list_add(&handle->packet_list, &chunk->node); - buffer += size; - count -= size; /* deduct the size we used, as we might have - to create more packets */ - } + buffer += size; + count -= size; /* deduct the size we used, as we might have + to create more packets */ + } - /* move through the WRITE packets that haven't been sent and send as many - as possible - remember that we don't block */ - chunk = _libssh2_list_first(&handle->packet_list); + /* move through the WRITE packets that haven't been sent and send as many + as possible - remember that we don't block */ + chunk = _libssh2_list_first(&handle->packet_list); - while(chunk) { - if(chunk->lefttosend) { - rc = _libssh2_channel_write(channel, 0, - &chunk->packet[chunk->sent], - chunk->lefttosend); - if(rc < 0) { - if(rc != LIBSSH2_ERROR_EAGAIN) - /* error */ + while(chunk) { + if(chunk->lefttosend) { + rc = _libssh2_channel_write(channel, 0, + &chunk->packet[chunk->sent], + chunk->lefttosend); + if(rc < 0) + /* remain in idle state */ return rc; - eagain++; - break; + + /* remember where to continue sending the next time */ + chunk->lefttosend -= rc; + chunk->sent += rc; + + if(chunk->lefttosend) + /* data left to send, get out of loop */ + break; } - /* remember where to continue sending the next time */ - chunk->lefttosend -= rc; - chunk->sent += rc; + /* move on to the next chunk with data to send */ + chunk = _libssh2_list_next(&chunk->node); + } + + /* fall-through */ + case libssh2_NB_state_sent: + + sftp->write_state = libssh2_NB_state_idle; + /* + * Count all ACKed packets + */ + chunk = _libssh2_list_first(&handle->packet_list); + while(chunk) { if(chunk->lefttosend) - /* data left to send, get out of loop */ + /* if the chunk still has data left to send, we shouldn't wait + for an ACK for it just yet */ break; - } - - /* move on to the next chunk with data to send */ - chunk = _libssh2_list_next(&chunk->node); - } - /* - * Count all ACKed packets - */ - chunk = _libssh2_list_first(&handle->packet_list); + else if(acked) + /* if we have sent data that is acked, we must return that + info before we call a function that might return EAGAIN */ + break; - while(chunk) { - if(chunk->lefttosend) - /* if the chunk still has data left to send, we shouldn't wait for - an ACK for it just yet */ - break; + /* we check the packets in order */ + rc = sftp_packet_require(sftp, SSH_FXP_STATUS, + chunk->request_id, &data, &data_len); + if (rc < 0) { + if (rc == LIBSSH2_ERROR_EAGAIN) + sftp->write_state = libssh2_NB_state_sent; + return rc; + } - /* we check the packets in order */ - rc = sftp_packet_require(sftp, SSH_FXP_STATUS, - chunk->request_id, &data, &data_len); - if (rc == LIBSSH2_ERROR_EAGAIN) { - eagain++; - break; - } - else if (rc) { - return _libssh2_error(session, rc, "Waiting for SFTP status"); - } - retcode = _libssh2_ntohu32(data + 5); - LIBSSH2_FREE(session, data); + retcode = _libssh2_ntohu32(data + 5); + LIBSSH2_FREE(session, data); - sftp->last_errno = retcode; - if (retcode == LIBSSH2_FX_OK) { - acked += chunk->len; /* number of payload data that was acked - here */ + sftp->last_errno = retcode; + if (retcode == LIBSSH2_FX_OK) { + acked += chunk->len; /* number of payload data that was acked + here */ - /* we increase the offset value for all acks */ - handle->u.file.offset += chunk->len; + /* we increase the offset value for all acks */ + handle->u.file.offset += chunk->len; - next = _libssh2_list_next(&chunk->node); + next = _libssh2_list_next(&chunk->node); - _libssh2_list_remove(&chunk->node); /* remove from list */ - LIBSSH2_FREE(session, chunk); /* free memory */ + _libssh2_list_remove(&chunk->node); /* remove from list */ + LIBSSH2_FREE(session, chunk); /* free memory */ - chunk = next; - } - else { - /* flush all pending packets from the outgoing list */ - sftp_packetlist_flush(handle); + chunk = next; + } + else { + /* flush all pending packets from the outgoing list */ + sftp_packetlist_flush(handle); - /* since we return error now, the applicaton will not get any - outstanding data acked, so we need to rewind the offset to - where the application knows it has reached with acked data */ - handle->u.file.offset -= handle->u.file.acked; + /* since we return error now, the applicaton will not get any + outstanding data acked, so we need to rewind the offset to + where the application knows it has reached with acked data */ + handle->u.file.offset -= handle->u.file.acked; - /* then reset the offset_sent to be the same as the offset */ - handle->u.file.offset_sent = handle->u.file.offset; + /* then reset the offset_sent to be the same as the offset */ + handle->u.file.offset_sent = handle->u.file.offset; - /* clear the acked counter since we can have no pending data to - ack after an error */ - handle->u.file.acked = 0; + /* clear the acked counter since we can have no pending data to + ack after an error */ + handle->u.file.acked = 0; - /* the server returned an error for that written chunk, propagate - this back to our parent function */ - return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL, - "FXP write failed"); + /* the server returned an error for that written chunk, propagate + this back to our parent function */ + return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL, + "FXP write failed"); + } } + break; } /* if there were acked data in a previous call that wasn't returned then, @@ -1813,9 +1824,7 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer, return ret; } - else if(eagain) - return _libssh2_error(session, LIBSSH2_ERROR_EAGAIN, - "Would block sftp_write"); + else return 0; /* nothing was acked, and no EAGAIN was received! */ } diff --git a/src/sftp.h b/src/sftp.h index 2bff712..f74060f 100644 --- a/src/sftp.h +++ b/src/sftp.h @@ -1,7 +1,7 @@ #ifndef _LIBSSH2_SFTP_H #define _LIBSSH2_SFTP_H /* - * Copyright (C) 2010, 2011 by Daniel Stenberg + * Copyright (C) 2010 - 2012 by Daniel Stenberg * Author: Daniel Stenberg <dan...@haxx.se> * * Redistribution and use in source and binary forms, @@ -158,9 +158,12 @@ struct _LIBSSH2_SFTP size_t open_packet_sent; uint32_t open_request_id; - /* State variables used in libssh2_sftp_read() */ + /* State variable used in sftp_read() */ libssh2_nonblocking_states read_state; + /* State variable used in sftp_write() */ + libssh2_nonblocking_states write_state; + /* State variables used in libssh2_sftp_readdir() */ libssh2_nonblocking_states readdir_state; unsigned char *readdir_packet; -- 1.7.9
_______________________________________________ libssh2-devel http://cool.haxx.se/cgi-bin/mailman/listinfo/libssh2-devel