Author: Dodan Mihai <mihai.do...@gmail.com> Branch: py3.5-sendmsg-recvmsg Changeset: r91972:aaa84a0a699e Date: 2017-07-26 13:59 +0300 http://bitbucket.org/pypy/pypy/changeset/aaa84a0a699e/
Log: Tests in regrtest/test_socket pass successfully. Memory leaks fixed diff --git a/pypy/module/_socket/interp_func.py b/pypy/module/_socket/interp_func.py --- a/pypy/module/_socket/interp_func.py +++ b/pypy/module/_socket/interp_func.py @@ -329,6 +329,13 @@ @unwrap_spec(size=int) def CMSG_SPACE(space, size): + """ + Socket method to determine the optimal byte size of the ancillary. + Recommended to be used when computing the ancillary size for recvmsg. + :param space: + :param size: an integer with the minimum size required. + :return: an integer with the minimum memory needed for the required size. The value is memory alligned + """ if size < 0: raise oefmt(space.w_OverflowError, "CMSG_SPACE() argument out of range") @@ -340,6 +347,13 @@ @unwrap_spec(len=int) def CMSG_LEN(space, len): + """ + Socket method to determine the optimal byte size of the ancillary. + Recommended to be used when computing the ancillary size for recvmsg. + :param space: + :param len: an integer with the minimum size required. + :return: an integer with the minimum memory needed for the required size. The value is not mem alligned. + """ if len < 0: raise oefmt(space.w_OverflowError, "CMSG_LEN() argument out of range") diff --git a/pypy/module/_socket/interp_socket.py b/pypy/module/_socket/interp_socket.py --- a/pypy/module/_socket/interp_socket.py +++ b/pypy/module/_socket/interp_socket.py @@ -450,6 +450,25 @@ @unwrap_spec(message_size=int, ancbufsize=int, flags=int) def recvmsg_w(self,space,message_size, ancbufsize = 0, flags = 0): + """ + recvfrom(message_size[, ancbufsize[, flags]]) -> (message, ancillary, flags, address) + recvmsg(message_size, [ancbufsize,[flags]]) -> (message, ancillary, flags, address) + Receive normal data (up to bufsize bytes) and ancillary data from the socket. + The ancbufsize argument sets the size in bytes of the internal buffer used to receive the ancillary data; + it defaults to 0, meaning that no ancillary data will be received. + Appropriate buffer sizes for ancillary data can be calculated using CMSG_SPACE() or CMSG_LEN(), + and items which do not fit into the buffer might be truncated or discarded. + The flags argument defaults to 0 and has the same meaning as for recv(). + The ancdata item is a list of zero or more tuples (cmsg_level, cmsg_type, cmsg_data): + cmsg_level and cmsg_type are integers specifying the protocol level and protocol-specific type respectively, + and cmsg_data is a bytes object holding the associated data. + + :param space: Non useable parameter. It represents the object space. + :param message_size: Maximum size of the message to be received + :param ancbufsize: Maximum size of the ancillary data to be received + :param flags: Receive flag. For more details, please check the Unix manual + :return: a tuple consisting of the message, the ancillary data, return flag and the address. + """ if (message_size < 0): raise oefmt(space.w_ValueError, "negative buffer size in recvmsg()") if ancbufsize < 0: @@ -476,10 +495,6 @@ except SocketError as e: converted_error(space, e, eintr_retry=True) - - - - @unwrap_spec(data='bufferstr', flags=int) def send_w(self, space, data, flags=0): """send(data[, flags]) -> count @@ -535,10 +550,22 @@ converted_error(space, e, eintr_retry=True) return space.newint(count) - #@unwrap_spec(data='bufferstr', flags = int) def sendmsg_w(self, space, w_data, w_ancillary=None, w_flags=None ,w_address=None): - """sendmsg(messages, [ancillaries, [flags, [address]]]) """ + sendmsg(data[,ancillary[,flags[,address]]]) -> bytes_sent + Send normal and ancillary data to the socket, gathering the non-ancillary data + from a series of buffers and concatenating it into a single message. + The ancdata argument specifies the ancillary data (control messages) as an iterable of zero or more tuples + (cmsg_level, cmsg_type, cmsg_data), where cmsg_level and cmsg_type are integers specifying the protocol level + and protocol-specific type respectively, and cmsg_data is a bytes-like object holding the associated data. + :param space: Represents the object space. + :param w_data: The message(s). needs to be a bytes like object + :param w_ancillary: needs to be a sequence object Can remain unspecified. + :param w_flags: needs to be an integer. Can remain unspecified. + :param w_address: needs to be a bytes-like object Can remain unspecified. + :return: Bytes sent from the message + """ + # Get the flag and address from the object space flags = 0 if space.is_none(w_flags) is False: flags = space.int_w(w_flags) @@ -547,6 +574,7 @@ if space.is_none(w_address) is False: address = self.addr_from_object(space, w_address) + # find data's type in the ObjectSpace and get a list of string out of it. data = [] if (w_data.typedef.name == 'list'): for i in w_data.getitems(): @@ -574,8 +602,9 @@ if not e.match(space,space.w_StopIteration): raise break + + # find the ancillary's type in the ObjectSpace and get a list of tuples out of it. ancillary = [] - if w_ancillary is not None: if (space.isinstance_w(w_ancillary,space.w_list)): for i in w_ancillary.getitems(): @@ -594,19 +623,22 @@ tup = (level, type, cont) ancillary.append(tup) else: - raise oefmt(space.w_TypeError,"[sendmsg() ancillary data items]() argument must be sequence of length 3") + raise oefmt(space.w_TypeError, + "[sendmsg() ancillary data items]() argument must be sequence of length 3") else: while True: try: if (space.is_generator(w_ancillary) is False): - raise oefmt(space.w_TypeError,"[sendmsg() ancillary data items]() argument must be sequence") + raise oefmt(space.w_TypeError, + "[sendmsg() ancillary data items]() argument must be sequence") i = space.next(w_ancillary) if (space.isinstance_w(i, space.w_tuple) is False): raise oefmt(space.w_TypeError, "[sendmsg() ancillary data items]() argument must be sequence of length 3") if (space.len_w(i) != 3): - raise oefmt(space.w_TypeError,"[sendmsg() ancillary data items]() argument must be sequence of length 3") + raise oefmt(space.w_TypeError, + "[sendmsg() ancillary data items]() argument must be sequence of length 3") except OperationError as e: if not e.match(space,space.w_StopIteration): raise diff --git a/rpython/rlib/_rsocket_rffi.py b/rpython/rlib/_rsocket_rffi.py --- a/rpython/rlib/_rsocket_rffi.py +++ b/rpython/rlib/_rsocket_rffi.py @@ -357,37 +357,37 @@ 'errno.h', 'limits.h', 'stdio.h', - 'sys/types.h'] + 'sys/types.h', + 'netinet/in.h', + 'arpa/inet.h'] separate_module_sources = [''' - //defines for recvmsg - #define SUCCESS 0 - #define BAD_MSG_SIZE_GIVEN -1 - #define BAD_ANC_SIZE_GIVEN -2 - #define WOULD_BLOCK -3 - #define AGAIN -4 - #define BADDESC -5 - #define CON_REF -6 - #define FAULT -7 - #define INTR -8 - #define NOMEM -9 - #define NOTCONN -10 - #define NOTSOCK -11 - #define MAL_ANC -12 + // special defines for returning from recvmsg + #define BAD_MSG_SIZE_GIVEN -10000 + #define BAD_ANC_SIZE_GIVEN -10001 + #define MAL_ANC -10002 - //defines for sendmsg + // special defines for returning from sendmsg #define MUL_MSGS_NOT_SUP -1000 #define ANC_DATA_TOO_LARGE -1001 #define ANC_DATA_TOO_LARGEX -1002 - #define MSG_IOVLEN 1 // CPyhton has hardcoded this as well. + /* + Even though you could, theoretically, receive more than one message, IF you set the socket option, + CPython has hardcoded the message number to 1, and implemented the option to receive more then 1 in a + different socket method: recvmsg_into + */ + #define MSG_IOVLEN 1 // CPython has hardcoded this as well. #if INT_MAX > 0x7fffffff #define SOCKLEN_T_LIMIT 0x7fffffff #else #define SOCKLEN_T_LIMIT INT_MAX #endif + // ################################################################################################ + // Recvmsg implementation and associated functions + // Taken from CPython. Determines the minimum memory space required for the ancillary data. #ifdef CMSG_SPACE static int cmsg_min_space(struct msghdr *msg, struct cmsghdr *cmsgh, size_t space) @@ -428,8 +428,8 @@ } #endif + // Taken from CPython. #ifdef CMSG_LEN - /* If pointer CMSG_DATA(cmsgh) is in buffer msg->msg_control, set *space to number of bytes following it in the buffer and return true; otherwise, return false. Assumes cmsgh, msg->msg_control and @@ -449,6 +449,7 @@ return 1; } + // Taken from CPython. /* If cmsgh is invalid or not contained in the buffer pointed to by msg->msg_control, return -1. If cmsgh is valid and its associated data is entirely contained in the buffer, set *data_len to the @@ -476,23 +477,35 @@ } #endif /* CMSG_LEN */ + /* + Structure meant to hold the information received after a recvmsg is performed. + Essentially it holds: the address, the message, the ancillary data and the return flags. + I use this structure for 2 main reasons: + - keep things ordered + - some of the ancillary parameters need to be int not long (rffi SignedP is actually long*), + therefore I cannot use the parameters directly + */ struct recvmsg_info { - int error_code; - struct sockaddr* address; + struct sockaddr* address; // address fields socklen_t addrlen; - int* length_of_messages; + int* length_of_messages; // message fields char** messages; int no_of_messages; - int size_of_ancillary; + int size_of_ancillary; // ancillary fields int* levels; int* types; char** file_descr; int* descr_per_ancillary; - int flags; + int retflag; // return flag field }; - + /* + Wrapper function over recvmsg. Since it returns a lot of data, + in a structure that is hard to parse in rffi, it was implemented in C. + All the parameters, save the socket fd, message_size, ancillary_size + will be malloc'd and/or modified. + */ RPY_EXTERN int recvmsg_implementation( int socket_fd, @@ -505,11 +518,11 @@ char** messages, int* no_of_messages, int* size_of_ancillary, - int** levels, - int** types, + long** levels, + long** types, char** file_descr, - int** descr_per_ancillary, - int* flag) + long** descr_per_ancillary, + int* retflag) { @@ -521,35 +534,36 @@ int cmsg_status; struct iovec iov; struct recvmsg_info* retinfo; - int error_flag; + int error_flag; // variable to be set in case of special errors. int cmsgdatalen = 0; - //allocation flags for failure + // variables that are set to 1, if the message charp has been allocated + // and if the ancillary variables have been allocated. To be used in case of failure. int iov_alloc = 0; int anc_alloc = 0; retinfo = (struct recvmsg_info*) malloc(sizeof(struct recvmsg_info)); - /* - if (message_size < 0){ - error_flag = BAD_MSG_SIZE_GIVEN; - goto fail; - } - */ + if (ancillary_size > SOCKLEN_T_LIMIT){ error_flag = BAD_ANC_SIZE_GIVEN; goto fail; } - + // Setup the messages iov struct memory iov.iov_base = (char*) malloc(message_size); memset(iov.iov_base, 0, message_size); iov.iov_len = message_size; + + // Setup the ancillary buffer memory controlbuf = malloc(ancillary_size); - recvd_addrlen = sizeof(struct sockaddr); + + // Setup the recv address memory + recvd_addrlen = sizeof(struct sockaddr_storage); recvd_address = (struct sockaddr*) malloc(recvd_addrlen); memset(recvd_address, 0,recvd_addrlen); + // Setup the msghdr struct msg.msg_name = recvd_address; msg.msg_namelen = recvd_addrlen; msg.msg_iov = &iov; @@ -557,64 +571,32 @@ msg.msg_control = controlbuf; msg.msg_controllen = ancillary_size; + // Link my structure to the msghdr fields retinfo->address = msg.msg_name; retinfo->length_of_messages = (int*) malloc (MSG_IOVLEN * sizeof(int)); - retinfo->no_of_messages = 1; + retinfo->no_of_messages = MSG_IOVLEN; retinfo->messages = (char**) malloc (MSG_IOVLEN * sizeof(char*)); retinfo->messages[0] = msg.msg_iov->iov_base; iov_alloc = 1; - ssize_t bytes_recvd = 0; bytes_recvd = recvmsg(socket_fd, &msg, flags); if (bytes_recvd < 0){ - switch (errno){ - case EAGAIN: - error_flag = -3; - break; - case EBADF: - error_flag = -5; - break; - case ECONNREFUSED: - error_flag = -6; - break; - case EFAULT: - error_flag = -7; - break; - case EINTR: - error_flag = -8; - break; - case ENOMEM: - error_flag = -9; - break; - case ENOTCONN: - error_flag = -10; - break; - case ENOTSOCK: - error_flag = -11; - break; - } - goto fail; } retinfo->addrlen = (socklen_t) msg.msg_namelen; retinfo->length_of_messages[0] = msg.msg_iov->iov_len; - + // Count the ancillary items & allocate the memory int anc_counter = 0; - /* - struct recv_list* first_item = (struct recv_list*) malloc(sizeof(struct recv_list)); - struct recv_list* iter = first_item; - */ for (cmsgh = ((msg.msg_controllen > 0) ? CMSG_FIRSTHDR(&msg) : NULL); cmsgh != NULL; cmsgh = CMSG_NXTHDR(&msg, cmsgh)) { anc_counter++; } - retinfo->size_of_ancillary = anc_counter; retinfo->file_descr = (char**) malloc (anc_counter * sizeof(char*)); retinfo->levels = (int*) malloc(anc_counter * sizeof(int)); @@ -622,6 +604,7 @@ retinfo->descr_per_ancillary = (int*) malloc(anc_counter * sizeof(int)); anc_alloc = 1; + // Extract the ancillary items int i=0; for (cmsgh = ((msg.msg_controllen > 0) ? CMSG_FIRSTHDR(&msg) : NULL); cmsgh != NULL; cmsgh = CMSG_NXTHDR(&msg, cmsgh)) { @@ -639,26 +622,20 @@ i++; } - retinfo->flags = msg.msg_flags; - retinfo->error_code = 0; + retinfo->retflag = msg.msg_flags; - //address = (struct sockaddr*) malloc (sizeof(struct sockaddr)); - memcpy(address,retinfo->address,sizeof(struct sockaddr)); + // Set the parameters of address + memcpy(address,retinfo->address,retinfo->addrlen); + *addrlen = retinfo->addrlen; - - *addrlen = retinfo->addrlen; + // Set the parameters of message *no_of_messages = retinfo->no_of_messages; *size_of_ancillary = retinfo->size_of_ancillary; - *length_of_messages = (int*) malloc (sizeof(int) * retinfo->no_of_messages); - //*length_of_messages = memcpy(*length_of_messages, retinfo->length_of_messages, sizeof(int) * retinfo->no_of_messages); - int counter = 0; for (i=0; i< retinfo->no_of_messages; i++) counter += retinfo->length_of_messages[i]; - - //*messages = (char*) malloc(sizeof(char) * counter); memset(*messages, 0, sizeof(char) * counter); counter = 0; for(i=0; i< retinfo->no_of_messages; i++){ @@ -666,20 +643,18 @@ counter += retinfo->length_of_messages[i]; } - *levels = (int*) malloc (sizeof(int) * retinfo->size_of_ancillary); - //*levels = - memcpy(*levels, retinfo->levels, sizeof(int) * retinfo->size_of_ancillary); - *types = (int*) malloc (sizeof(int) * retinfo->size_of_ancillary); - //*types = - memcpy(*types, retinfo->types, sizeof(int) * retinfo->size_of_ancillary); - *descr_per_ancillary = (int*) malloc (sizeof(int) * retinfo->size_of_ancillary); - //*descr_per_ancillary = - memcpy(*descr_per_ancillary, retinfo->descr_per_ancillary, sizeof(int) * retinfo->size_of_ancillary); - + // Set the parameters of ancillary + *levels = (long*) malloc (sizeof(long) * retinfo->size_of_ancillary); + *types = (long*) malloc (sizeof(long) * retinfo->size_of_ancillary); + *descr_per_ancillary = (long*) malloc (sizeof(long) * retinfo->size_of_ancillary); counter = 0; - for (i=0; i < retinfo->size_of_ancillary; i++) + for (i=0; i < retinfo->size_of_ancillary; i++){ counter += retinfo->descr_per_ancillary[i]; - + // Convert the int* to long* + levels[0][i] = (long) retinfo->levels[i]; + types[0][i] = (long) retinfo->types[i]; + descr_per_ancillary[0][i] = (long) retinfo->descr_per_ancillary[i]; + } *file_descr = (char*) malloc (sizeof(char) * counter); memset(*file_descr, 0, sizeof(char) * counter); counter = 0; @@ -688,13 +663,10 @@ counter += retinfo->descr_per_ancillary[i]; } - *flag = retinfo->flags; - //int k; - //char* dsadas; - //dsadas = (char*) (*file_descr[0]); - //for (k=0; k<retinfo->no_of_messages * sizeof(int); k++) - // printf("0x%X ", dsadas[k]); + // Set the retflag + *retflag = retinfo->retflag; + // Free the memory free(retinfo->address); free(retinfo->length_of_messages); free(retinfo->levels); @@ -721,17 +693,8 @@ free(retinfo->messages[0]); free(retinfo->messages); free(retinfo->address); + free(retinfo); free(controlbuf); - file_descr = NULL; - levels = NULL; - types = NULL; - descr_per_ancillary = NULL; - length_of_messages = NULL; - messages =NULL; - address = NULL; - addrlen = NULL; - no_of_messages = NULL; - size_of_ancillary = NULL; }else{ if (iov_alloc){ @@ -740,22 +703,14 @@ free(retinfo->messages); free(retinfo->address); free(controlbuf); - length_of_messages = NULL; - messages =NULL; - address = NULL; - file_descr = NULL; - levels = NULL; - types = NULL; - descr_per_ancillary = NULL; - addrlen = NULL; - no_of_messages = NULL; - size_of_ancillary = NULL; - + free(retinfo); } } return error_flag; err_closefds: + // Special case for UNIX sockets. In case file descriptors are received, they need to be closed. + // Taken from CPython #ifdef SCM_RIGHTS /* Close all descriptors coming from SCM_RIGHTS, so they don't leak. */ for (cmsgh = ((msg.msg_controllen > 0) ? CMSG_FIRSTHDR(&msg) : NULL); @@ -783,8 +738,8 @@ } - //################################################################################################ - //send goes from here + // ################################################################################################ + // Sendmsg implementation and associated functions #ifdef CMSG_LEN static int @@ -822,8 +777,28 @@ } #endif + /* + sendmsg_implementation is a wrapper over sendmsg of the API. + It was inspired from the way CPython did their implementation of this. + The main reason that it was written in C, is the struct msghdr, + which contains the ancillary data in a linked list of cmsghdr structures. + It was simpler to use it in C, and then push the simpler types of data via rffi. + */ RPY_EXTERN - int sendmsg_implementation(int socket, struct sockaddr* address, socklen_t addrlen, long* length_of_messages, char** messages, int no_of_messages, long* levels, long* types, char** file_descriptors, long* no_of_fds, int control_length, int flag ) + int sendmsg_implementation + (int socket, + struct sockaddr* address, + socklen_t addrlen, + long* length_of_messages, + char** messages, + int no_of_messages, + long* levels, + long* types, + char** file_descriptors, + long* no_of_fds, + int control_length, + int flag + ) { struct msghdr msg = {0}; @@ -832,15 +807,16 @@ int retval; size_t i; + // Prepare the msghdr structure for the send: + // Add the address - if (address != NULL) { msg.msg_name = address; msg.msg_namelen = addrlen; } + // Add the message struct iovec *iovs = NULL; - if (no_of_messages > 0){ iovs = (struct iovec*) malloc(no_of_messages * sizeof(struct iovec)); @@ -853,8 +829,8 @@ iovs[i].iov_len = length_of_messages[i]; } } + // Add the ancillary - #ifndef CMSG_SPACE if (control_length > 1){ free(iovs); @@ -862,7 +838,9 @@ } #endif if (control_length > 0){ + //compute the total size of the ancillary + //getting the exact amount of space can be tricky and os dependent. size_t total_size_of_ancillary = 0; size_t space; size_t controllen = 0, controllen_last = 0; @@ -884,16 +862,14 @@ return ANC_DATA_TOO_LARGEX; } controllen_last = controllen; - } - controlbuf = malloc(controllen); //* sizeof(int) - + controlbuf = malloc(controllen); msg.msg_control= controlbuf; msg.msg_controllen = controllen; + // memset controlbuf to 0 to avoid trash in the ancillary memset(controlbuf, 0, controllen); - cmsg = NULL; for (i = 0; i< control_length; i++){ cmsg = (i == 0) ? CMSG_FIRSTHDR(&msg) : CMSG_NXTHDR(&msg, cmsg); @@ -912,6 +888,7 @@ // Send the data retval = sendmsg(socket, &msg, flag); + // free everything that was allocated here, and we would not need in rsocket if (iovs != NULL) free(iovs); if (controlbuf !=NULL) @@ -919,6 +896,14 @@ return retval; } + + // ################################################################################################ + // Wrappers for CMSG_SPACE and CMSG_LEN + + /* + These 2 functions are wrappers over sys/socket.h's CMSG_SPACE and CMSG_LEN. + They are identical to CPython's. + */ #ifdef CMSG_SPACE RPY_EXTERN size_t CMSG_SPACE_wrapper(size_t desired_space){ @@ -931,7 +916,6 @@ #endif #ifdef CMSG_LEN - RPY_EXTERN size_t CMSG_LEN_wrapper(size_t desired_len){ size_t result; @@ -942,14 +926,22 @@ } #endif + // ################################################################################################ + // Extra functions that I needed + + /* + This function is used to memcpy from a char* at an offset. + Could not get rffi.c_memcpy to do it at an offset, so I made my own. + */ RPY_EXTERN - char* memcpy_from_CCHARP_at_offset_and_size(char* string, int offset, int size){ - char* buffer; - buffer = (char*)malloc(sizeof(char)*size); - buffer = memcpy(buffer, string + offset, size); - return buffer; + int memcpy_from_CCHARP_at_offset_and_size(char* stringfrom, char** stringto, int offset, int size){ + *stringto = memcpy(*stringto, stringfrom + offset, size); + return 0; } + /* + These functions free memory that was allocated in C (sendmsg or recvmsg) was used in rsocket and now needs cleanup + */ RPY_EXTERN int free_pointer_to_signedp(int** ptrtofree){ free(*ptrtofree); @@ -967,7 +959,7 @@ post_include_bits =[ "RPY_EXTERN " "int sendmsg_implementation(int socket, struct sockaddr* address, socklen_t addrlen, long* length_of_messages, char** messages, int no_of_messages, long* levels, long* types, char** file_descriptors, long* no_of_fds, int control_length, int flag );\n" "RPY_EXTERN " - "int recvmsg_implementation(int socket_fd, int message_size, int ancillary_size, int flags, struct sockaddr* address, socklen_t* addrlen, int** length_of_messages, char** messages, int* no_of_messages, int* size_of_ancillary, int** levels, int** types, char** file_descr, int** descr_per_ancillary, int* flag);\n" + "int recvmsg_implementation(int socket_fd, int message_size, int ancillary_size, int flags, struct sockaddr* address, socklen_t* addrlen, int** length_of_messages, char** messages, int* no_of_messages, int* size_of_ancillary, long** levels, long** types, char** file_descr, long** descr_per_ancillary, int* flag);\n" "static " "int cmsg_min_space(struct msghdr *msg, struct cmsghdr *cmsgh, size_t space);\n" "static " @@ -983,32 +975,13 @@ "RPY_EXTERN " "size_t CMSG_SPACE_wrapper(size_t desired_space);\n" "RPY_EXTERN " - "char* memcpy_from_CCHARP_at_offset_and_size(char* string, int offset, int size);\n" + "int memcpy_from_CCHARP_at_offset_and_size(char* stringfrom, char** stringto, int offset, int size);\n" "RPY_EXTERN " "int free_pointer_to_signedp(int** ptrtofree);\n" "RPY_EXTERN " "int free_ptr_to_charp(char** ptrtofree);\n" ] - #CConfig.SignedPP = lltype.Ptr(lltype.Array(rffi.SIGNEDP, hints={'nolength': True})) - - - # CConfig.recvmsginfo = platform.Struct('struct recvmsg_info', - # [('error_code',rffi.SIGNED), - # ('address',sockaddr_ptr), - # ('addrlen',socklen_t_ptr), - # ('length_of_messages', rffi.SIGNEDP), - # ('messages',rffi.CCHARPP), - # ('no_of_messages',rffi.INT), - # ('size_of_ancillary',rffi.INT), - # ('levels', rffi.SIGNEDP), - # ('types', rffi.SIGNEDP), - # ('file_descr', rffi.CCHARPP), - # ('descr_per_ancillary', rffi.SIGNEDP), - # ('flags', rffi.INT), - # ]) - - # compilation_info = ExternalCompilationInfo( includes=includes, @@ -1252,7 +1225,7 @@ compilation_info=compilation_info)) memcpy_from_CCHARP_at_offset = jit.dont_look_inside(rffi.llexternal("memcpy_from_CCHARP_at_offset_and_size", - [rffi.CCHARP,rffi.INT,rffi.INT],rffi.CCHARP,save_err=SAVE_ERR,compilation_info=compilation_info)) + [rffi.CCHARP, rffi.CCHARPP,rffi.INT,rffi.INT],rffi.INT,save_err=SAVE_ERR,compilation_info=compilation_info)) freeccharp = jit.dont_look_inside(rffi.llexternal("free_ptr_to_charp", [rffi.CCHARPP],rffi.INT,save_err=SAVE_ERR,compilation_info=compilation_info)) freesignedp = jit.dont_look_inside(rffi.llexternal("free_pointer_to_signedp", diff --git a/rpython/rlib/rsocket.py b/rpython/rlib/rsocket.py --- a/rpython/rlib/rsocket.py +++ b/rpython/rlib/rsocket.py @@ -932,6 +932,7 @@ address.addrlen = addrlen else: address = None + print address data = buf.str(read_bytes) return (data, address) raise self.error_handler() @@ -965,14 +966,20 @@ @jit.dont_look_inside def recvmsg(self, message_size, ancbufsize = 0, flags = 0): + """ + Receive up to message_size bytes from a message. Also receives ancillary data. + Returns the message, ancillary, flag and address of the sender. + :param message_size: Maximum size of the message to be received + :param ancbufsize: Maximum size of the ancillary data to be received + :param flags: Receive flag. For more details, please check the Unix manual + :return: a tuple consisting of the message, the ancillary data, return flag and the address. + """ if message_size < 0: raise RSocketError("Invalid message size") if ancbufsize < 0: raise RSocketError("invalid ancillary data buffer length") - # addr, maxlen = make_null_address(self.family) - # addrlen_p = lltype.malloc(_c.socklen_t_ptr.TO, flavor='raw') - # addrlen_p[0] = rffi.cast(_c.socklen_t, maxlen) + self.wait_for_data(False) address, addr_p, addrlen_p = self._addrbuf() len_of_msgs = lltype.malloc(rffi.SIGNEDPP.TO,1,flavor='raw',track_allocation=True,nonmovable=False) messages = lltype.malloc(rffi.CCHARPP.TO,1,flavor='raw',track_allocation=True,nonmovable=False ) @@ -989,6 +996,7 @@ retflag = lltype.malloc(rffi.SIGNEDP.TO,1,flavor='raw',track_allocation=True,nonmovable=False ) retflag[0] = rffi.cast(rffi.SIGNED,0) + # a mask for the SIGNEDP's that need to be cast to int. (long default) LONG_MASK = 2**32 - 1 reply = _c.recvmsg(self.fd, rffi.cast(lltype.Signed,message_size), rffi.cast(lltype.Signed,ancbufsize),rffi.cast(lltype.Signed,flags), @@ -1008,55 +1016,74 @@ offset = 0 list_of_tuples = [] + + pre_anc = lltype.malloc(rffi.CCHARPP.TO, 1, flavor='raw', track_allocation=True, nonmovable=False) for i in range(anc_size): - x = rffi.cast(rffi.SIGNED, levels[0][i]) - x &= LONG_MASK - level = x - x = rffi.cast(rffi.SIGNED,types[0][i]) - x &= LONG_MASK - type = x - x = rffi.cast(rffi.SIGNED,descr_per_anc[0][i]) - x &= LONG_MASK - bytes_in_anc = x - pre_anc = _c.memcpy_from_CCHARP_at_offset(file_descr[0],rffi.cast(rffi.SIGNED,offset), bytes_in_anc) - anc = rffi.charpsize2str(pre_anc,bytes_in_anc) + level = rffi.cast(rffi.SIGNED, levels[0][i]) + type = rffi.cast(rffi.SIGNED, types[0][i]) + bytes_in_anc = rffi.cast(rffi.SIGNED, descr_per_anc[0][i]) + pre_anc[0] = lltype.malloc(rffi.CCHARP.TO, bytes_in_anc,flavor='raw',track_allocation=True,nonmovable=False) + _c.memcpy_from_CCHARP_at_offset(file_descr[0], pre_anc,rffi.cast(rffi.SIGNED,offset), bytes_in_anc) + anc = rffi.charpsize2str(pre_anc[0],bytes_in_anc) tup = (level,type, anc) list_of_tuples.append(tup) offset += bytes_in_anc - #lltype.free(pre_anc, flavor='raw') - #address.unlock() + lltype.free(pre_anc[0], flavor='raw') + if addrlen: address.addrlen = addrlen else: + address.unlock() address = None - rettup = (retmsg,list_of_tuples,returnflag,address) - #free underlying complexity first if address is not None: address.unlock() - # lltype.free(messages[0],flavor='raw') + # free underlying complexity first _c.freeccharp(file_descr) _c.freesignedp(len_of_msgs) _c.freesignedp(levels) _c.freesignedp(types) _c.freesignedp(descr_per_anc) + lltype.free(messages[0], flavor='raw') + lltype.free(pre_anc,flavor='raw') lltype.free(messages,flavor='raw') lltype.free(file_descr,flavor='raw') lltype.free(len_of_msgs,flavor='raw') lltype.free(no_of_messages, flavor='raw') lltype.free(size_of_anc, flavor='raw') lltype.free(levels, flavor='raw') + lltype.free(types, flavor='raw') lltype.free(descr_per_anc, flavor='raw') lltype.free(retflag, flavor='raw') lltype.free(addrlen_p,flavor='raw') return rettup else: + + #in case of failure the underlying complexity has already been freed + lltype.free(messages[0], flavor='raw') + lltype.free(messages, flavor='raw') + lltype.free(file_descr, flavor='raw') + lltype.free(len_of_msgs, flavor='raw') + lltype.free(no_of_messages, flavor='raw') + lltype.free(size_of_anc, flavor='raw') + lltype.free(levels, flavor='raw') + lltype.free(types, flavor='raw') + lltype.free(descr_per_anc, flavor='raw') + lltype.free(retflag, flavor='raw') + lltype.free(addrlen_p, flavor='raw') + if address is not None: address.unlock() + if (reply == -10000): + raise RSocketError("Invalid message size") + if (reply == -10001): + raise RSocketError("Invalid ancillary data buffer length") + if (reply == -10002): + raise RSocketError("received malformed or improperly truncated ancillary data") raise last_error() @@ -1109,8 +1136,16 @@ @jit.dont_look_inside def sendmsg(self, messages, ancillary=None, flags=0, address=None): - # addr = address.lock() - # addrlen = address.addrlen + """ + Send data and ancillary on a socket. For use of ancillary data, please check the Unix manual. + Work on connectionless sockets via the address parameter. + :param messages: a message that is a list of strings + :param ancillary: data to be sent separate from the message body. Needs to be a list of tuples. + E.g. [(level,type, bytes),...]. Default None. + :param flags: the flag to be set for sendmsg. Please check the Unix manual regarding values. Default 0 + :param address: address of the recepient. Useful for when sending on connectionless sockets. Default None + :return: Bytes sent from the message + """ need_to_free_address = True if address is None: need_to_free_address = False @@ -1127,7 +1162,6 @@ for message in messages: messages_ptr[counter] = rffi.str2charp(message) messages_length_ptr[counter] = rffi.cast(rffi.SIGNED, len(message)) - #messages_length_ptr[counter] = rffi.cast(rffi.SIGNED, 0x00cabc00abcabc00) counter += 1 messages_ptr[counter] = lltype.nullptr(rffi.CCHARP.TO) if ancillary is not None: @@ -1146,13 +1180,14 @@ levels[counter] = rffi.cast(rffi.SIGNED,level) types[counter] = rffi.cast(rffi.SIGNED,type) desc_per_ancillary[counter] = rffi.cast(rffi.SIGNED, (len(content))) - #file_descr[counter] = lltype.malloc(rffi.CCHARP.TO,len(content),flavor='raw',zero=True, track_allocation=True,nonmovable=False) file_descr[counter] = rffi.str2charp(content, track_allocation=True) counter +=1 else: size_of_ancillary = 0 snd_no_msgs = rffi.cast(rffi.SIGNED, no_of_messages) snd_anc_size =rffi.cast(rffi.SIGNED, size_of_ancillary) + + bytes_sent = _c.sendmsg(self.fd, addr, addrlen, messages_length_ptr, messages_ptr, snd_no_msgs,levels,types,file_descr,desc_per_ancillary,snd_anc_size,flags) @@ -1171,6 +1206,7 @@ lltype.free(levels, flavor='raw', track_allocation=True) lltype.free(file_descr, flavor='raw', track_allocation=True) + self.wait_for_data(True) if (bytes_sent < 0) and (bytes_sent!=-1000) and (bytes_sent!=-1001) and (bytes_sent!=-1002): raise last_error() @@ -1361,12 +1397,24 @@ if _c._POSIX: def CMSG_LEN( demanded_len): + """ + Socket method to determine the optimal byte size of the ancillary. + Recommended to be used when computing the ancillary size for recvmsg. + :param demanded_len: an integer with the minimum size required. + :return: an integer with the minimum memory needed for the required size. The value is not memory alligned + """ if demanded_len < 0: return 0 result = _c.CMSG_LEN(demanded_len) return result def CMSG_SPACE( demanded_size): + """ + Socket method to determine the optimal byte size of the ancillary. + Recommended to be used when computing the ancillary size for recvmsg. + :param demanded_size: an integer with the minimum size required. + :return: an integer with the minimum memory needed for the required size. The value is memory alligned + """ if demanded_size < 0: return 0 result = _c.CMSG_SPACE(demanded_size) _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit