Author: Dodan Mihai <[email protected]>
Branch: py3.5-sendmsg-recvmsg
Changeset: r91972:aaa84a0a699e
Date: 2017-07-26 13:59 +0300
http://bitbucket.org/pypy/pypy/changeset/aaa84a0a699e/
Log: Tests in regrtest/test_socket pass successfully. Memory leaks fixed
diff --git a/pypy/module/_socket/interp_func.py
b/pypy/module/_socket/interp_func.py
--- a/pypy/module/_socket/interp_func.py
+++ b/pypy/module/_socket/interp_func.py
@@ -329,6 +329,13 @@
@unwrap_spec(size=int)
def CMSG_SPACE(space, size):
+ """
+ Socket method to determine the optimal byte size of the ancillary.
+ Recommended to be used when computing the ancillary size for recvmsg.
+ :param space:
+ :param size: an integer with the minimum size required.
+ :return: an integer with the minimum memory needed for the required size.
The value is memory alligned
+ """
if size < 0:
raise oefmt(space.w_OverflowError,
"CMSG_SPACE() argument out of range")
@@ -340,6 +347,13 @@
@unwrap_spec(len=int)
def CMSG_LEN(space, len):
+ """
+ Socket method to determine the optimal byte size of the ancillary.
+ Recommended to be used when computing the ancillary size for recvmsg.
+ :param space:
+ :param len: an integer with the minimum size required.
+ :return: an integer with the minimum memory needed for the required size.
The value is not mem alligned.
+ """
if len < 0:
raise oefmt(space.w_OverflowError,
"CMSG_LEN() argument out of range")
diff --git a/pypy/module/_socket/interp_socket.py
b/pypy/module/_socket/interp_socket.py
--- a/pypy/module/_socket/interp_socket.py
+++ b/pypy/module/_socket/interp_socket.py
@@ -450,6 +450,25 @@
@unwrap_spec(message_size=int, ancbufsize=int, flags=int)
def recvmsg_w(self,space,message_size, ancbufsize = 0, flags = 0):
+ """
+ recvfrom(message_size[, ancbufsize[, flags]]) -> (message, ancillary,
flags, address)
+ recvmsg(message_size, [ancbufsize,[flags]]) -> (message, ancillary,
flags, address)
+ Receive normal data (up to bufsize bytes) and ancillary data from the
socket.
+ The ancbufsize argument sets the size in bytes of the internal buffer
used to receive the ancillary data;
+ it defaults to 0, meaning that no ancillary data will be received.
+ Appropriate buffer sizes for ancillary data can be calculated using
CMSG_SPACE() or CMSG_LEN(),
+ and items which do not fit into the buffer might be truncated or
discarded.
+ The flags argument defaults to 0 and has the same meaning as for
recv().
+ The ancdata item is a list of zero or more tuples (cmsg_level,
cmsg_type, cmsg_data):
+ cmsg_level and cmsg_type are integers specifying the protocol level
and protocol-specific type respectively,
+ and cmsg_data is a bytes object holding the associated data.
+
+ :param space: Non useable parameter. It represents the object space.
+ :param message_size: Maximum size of the message to be received
+ :param ancbufsize: Maximum size of the ancillary data to be received
+ :param flags: Receive flag. For more details, please check the Unix
manual
+ :return: a tuple consisting of the message, the ancillary data, return
flag and the address.
+ """
if (message_size < 0):
raise oefmt(space.w_ValueError, "negative buffer size in
recvmsg()")
if ancbufsize < 0:
@@ -476,10 +495,6 @@
except SocketError as e:
converted_error(space, e, eintr_retry=True)
-
-
-
-
@unwrap_spec(data='bufferstr', flags=int)
def send_w(self, space, data, flags=0):
"""send(data[, flags]) -> count
@@ -535,10 +550,22 @@
converted_error(space, e, eintr_retry=True)
return space.newint(count)
- #@unwrap_spec(data='bufferstr', flags = int)
def sendmsg_w(self, space, w_data, w_ancillary=None, w_flags=None
,w_address=None):
- """sendmsg(messages, [ancillaries, [flags, [address]]])
"""
+ sendmsg(data[,ancillary[,flags[,address]]]) -> bytes_sent
+ Send normal and ancillary data to the socket, gathering the
non-ancillary data
+ from a series of buffers and concatenating it into a single message.
+ The ancdata argument specifies the ancillary data (control messages)
as an iterable of zero or more tuples
+ (cmsg_level, cmsg_type, cmsg_data), where cmsg_level and cmsg_type are
integers specifying the protocol level
+ and protocol-specific type respectively, and cmsg_data is a bytes-like
object holding the associated data.
+ :param space: Represents the object space.
+ :param w_data: The message(s). needs to be a bytes like object
+ :param w_ancillary: needs to be a sequence object Can remain
unspecified.
+ :param w_flags: needs to be an integer. Can remain unspecified.
+ :param w_address: needs to be a bytes-like object Can remain
unspecified.
+ :return: Bytes sent from the message
+ """
+ # Get the flag and address from the object space
flags = 0
if space.is_none(w_flags) is False:
flags = space.int_w(w_flags)
@@ -547,6 +574,7 @@
if space.is_none(w_address) is False:
address = self.addr_from_object(space, w_address)
+ # find data's type in the ObjectSpace and get a list of string out of
it.
data = []
if (w_data.typedef.name == 'list'):
for i in w_data.getitems():
@@ -574,8 +602,9 @@
if not e.match(space,space.w_StopIteration):
raise
break
+
+ # find the ancillary's type in the ObjectSpace and get a list of
tuples out of it.
ancillary = []
-
if w_ancillary is not None:
if (space.isinstance_w(w_ancillary,space.w_list)):
for i in w_ancillary.getitems():
@@ -594,19 +623,22 @@
tup = (level, type, cont)
ancillary.append(tup)
else:
- raise oefmt(space.w_TypeError,"[sendmsg() ancillary
data items]() argument must be sequence of length 3")
+ raise oefmt(space.w_TypeError,
+ "[sendmsg() ancillary data items]()
argument must be sequence of length 3")
else:
while True:
try:
if (space.is_generator(w_ancillary) is False):
- raise oefmt(space.w_TypeError,"[sendmsg()
ancillary data items]() argument must be sequence")
+ raise oefmt(space.w_TypeError,
+ "[sendmsg() ancillary data items]()
argument must be sequence")
i = space.next(w_ancillary)
if (space.isinstance_w(i, space.w_tuple) is False):
raise oefmt(space.w_TypeError,
"[sendmsg() ancillary data items]()
argument must be sequence of length 3")
if (space.len_w(i) != 3):
- raise oefmt(space.w_TypeError,"[sendmsg()
ancillary data items]() argument must be sequence of length 3")
+ raise oefmt(space.w_TypeError,
+ "[sendmsg() ancillary data items]()
argument must be sequence of length 3")
except OperationError as e:
if not e.match(space,space.w_StopIteration):
raise
diff --git a/rpython/rlib/_rsocket_rffi.py b/rpython/rlib/_rsocket_rffi.py
--- a/rpython/rlib/_rsocket_rffi.py
+++ b/rpython/rlib/_rsocket_rffi.py
@@ -357,37 +357,37 @@
'errno.h',
'limits.h',
'stdio.h',
- 'sys/types.h']
+ 'sys/types.h',
+ 'netinet/in.h',
+ 'arpa/inet.h']
separate_module_sources = ['''
- //defines for recvmsg
- #define SUCCESS 0
- #define BAD_MSG_SIZE_GIVEN -1
- #define BAD_ANC_SIZE_GIVEN -2
- #define WOULD_BLOCK -3
- #define AGAIN -4
- #define BADDESC -5
- #define CON_REF -6
- #define FAULT -7
- #define INTR -8
- #define NOMEM -9
- #define NOTCONN -10
- #define NOTSOCK -11
- #define MAL_ANC -12
+ // special defines for returning from recvmsg
+ #define BAD_MSG_SIZE_GIVEN -10000
+ #define BAD_ANC_SIZE_GIVEN -10001
+ #define MAL_ANC -10002
- //defines for sendmsg
+ // special defines for returning from sendmsg
#define MUL_MSGS_NOT_SUP -1000
#define ANC_DATA_TOO_LARGE -1001
#define ANC_DATA_TOO_LARGEX -1002
- #define MSG_IOVLEN 1 // CPyhton has hardcoded this as well.
+ /*
+ Even though you could, theoretically, receive more than one
message, IF you set the socket option,
+ CPython has hardcoded the message number to 1, and implemented the
option to receive more then 1 in a
+ different socket method: recvmsg_into
+ */
+ #define MSG_IOVLEN 1 // CPython has hardcoded this as well.
#if INT_MAX > 0x7fffffff
#define SOCKLEN_T_LIMIT 0x7fffffff
#else
#define SOCKLEN_T_LIMIT INT_MAX
#endif
+ //
################################################################################################
+ // Recvmsg implementation and associated functions
+ // Taken from CPython. Determines the minimum memory space required
for the ancillary data.
#ifdef CMSG_SPACE
static int
cmsg_min_space(struct msghdr *msg, struct cmsghdr *cmsgh, size_t space)
@@ -428,8 +428,8 @@
}
#endif
+ // Taken from CPython.
#ifdef CMSG_LEN
-
/* If pointer CMSG_DATA(cmsgh) is in buffer msg->msg_control, set
*space to number of bytes following it in the buffer and return
true; otherwise, return false. Assumes cmsgh, msg->msg_control and
@@ -449,6 +449,7 @@
return 1;
}
+ // Taken from CPython.
/* If cmsgh is invalid or not contained in the buffer pointed to by
msg->msg_control, return -1. If cmsgh is valid and its associated
data is entirely contained in the buffer, set *data_len to the
@@ -476,23 +477,35 @@
}
#endif /* CMSG_LEN */
+ /*
+ Structure meant to hold the information received after a recvmsg
is performed.
+ Essentially it holds: the address, the message, the ancillary data
and the return flags.
+ I use this structure for 2 main reasons:
+ - keep things ordered
+ - some of the ancillary parameters need to be int not long
(rffi SignedP is actually long*),
+ therefore I cannot use the parameters directly
+ */
struct recvmsg_info
{
- int error_code;
- struct sockaddr* address;
+ struct sockaddr* address; // address fields
socklen_t addrlen;
- int* length_of_messages;
+ int* length_of_messages; // message fields
char** messages;
int no_of_messages;
- int size_of_ancillary;
+ int size_of_ancillary; // ancillary fields
int* levels;
int* types;
char** file_descr;
int* descr_per_ancillary;
- int flags;
+ int retflag; // return flag field
};
-
+ /*
+ Wrapper function over recvmsg. Since it returns a lot of data,
+ in a structure that is hard to parse in rffi, it was implemented
in C.
+ All the parameters, save the socket fd, message_size,
ancillary_size
+ will be malloc'd and/or modified.
+ */
RPY_EXTERN
int recvmsg_implementation(
int socket_fd,
@@ -505,11 +518,11 @@
char** messages,
int* no_of_messages,
int* size_of_ancillary,
- int** levels,
- int** types,
+ long** levels,
+ long** types,
char** file_descr,
- int** descr_per_ancillary,
- int* flag)
+ long** descr_per_ancillary,
+ int* retflag)
{
@@ -521,35 +534,36 @@
int cmsg_status;
struct iovec iov;
struct recvmsg_info* retinfo;
- int error_flag;
+ int error_flag; // variable to be set in case of special errors.
int cmsgdatalen = 0;
- //allocation flags for failure
+ // variables that are set to 1, if the message charp has been
allocated
+ // and if the ancillary variables have been allocated. To be used
in case of failure.
int iov_alloc = 0;
int anc_alloc = 0;
retinfo = (struct recvmsg_info*) malloc(sizeof(struct
recvmsg_info));
- /*
- if (message_size < 0){
- error_flag = BAD_MSG_SIZE_GIVEN;
- goto fail;
- }
- */
+
if (ancillary_size > SOCKLEN_T_LIMIT){
error_flag = BAD_ANC_SIZE_GIVEN;
goto fail;
}
-
+ // Setup the messages iov struct memory
iov.iov_base = (char*) malloc(message_size);
memset(iov.iov_base, 0, message_size);
iov.iov_len = message_size;
+
+ // Setup the ancillary buffer memory
controlbuf = malloc(ancillary_size);
- recvd_addrlen = sizeof(struct sockaddr);
+
+ // Setup the recv address memory
+ recvd_addrlen = sizeof(struct sockaddr_storage);
recvd_address = (struct sockaddr*) malloc(recvd_addrlen);
memset(recvd_address, 0,recvd_addrlen);
+ // Setup the msghdr struct
msg.msg_name = recvd_address;
msg.msg_namelen = recvd_addrlen;
msg.msg_iov = &iov;
@@ -557,64 +571,32 @@
msg.msg_control = controlbuf;
msg.msg_controllen = ancillary_size;
+ // Link my structure to the msghdr fields
retinfo->address = msg.msg_name;
retinfo->length_of_messages = (int*) malloc (MSG_IOVLEN *
sizeof(int));
- retinfo->no_of_messages = 1;
+ retinfo->no_of_messages = MSG_IOVLEN;
retinfo->messages = (char**) malloc (MSG_IOVLEN * sizeof(char*));
retinfo->messages[0] = msg.msg_iov->iov_base;
iov_alloc = 1;
-
ssize_t bytes_recvd = 0;
bytes_recvd = recvmsg(socket_fd, &msg, flags);
if (bytes_recvd < 0){
- switch (errno){
- case EAGAIN:
- error_flag = -3;
- break;
- case EBADF:
- error_flag = -5;
- break;
- case ECONNREFUSED:
- error_flag = -6;
- break;
- case EFAULT:
- error_flag = -7;
- break;
- case EINTR:
- error_flag = -8;
- break;
- case ENOMEM:
- error_flag = -9;
- break;
- case ENOTCONN:
- error_flag = -10;
- break;
- case ENOTSOCK:
- error_flag = -11;
- break;
- }
-
goto fail;
}
retinfo->addrlen = (socklen_t) msg.msg_namelen;
retinfo->length_of_messages[0] = msg.msg_iov->iov_len;
-
+ // Count the ancillary items & allocate the memory
int anc_counter = 0;
- /*
- struct recv_list* first_item = (struct recv_list*)
malloc(sizeof(struct recv_list));
- struct recv_list* iter = first_item;
- */
for (cmsgh = ((msg.msg_controllen > 0) ? CMSG_FIRSTHDR(&msg) :
NULL);
cmsgh != NULL; cmsgh = CMSG_NXTHDR(&msg, cmsgh)) {
anc_counter++;
}
-
retinfo->size_of_ancillary = anc_counter;
retinfo->file_descr = (char**) malloc (anc_counter *
sizeof(char*));
retinfo->levels = (int*) malloc(anc_counter * sizeof(int));
@@ -622,6 +604,7 @@
retinfo->descr_per_ancillary = (int*) malloc(anc_counter *
sizeof(int));
anc_alloc = 1;
+ // Extract the ancillary items
int i=0;
for (cmsgh = ((msg.msg_controllen > 0) ? CMSG_FIRSTHDR(&msg) :
NULL);
cmsgh != NULL; cmsgh = CMSG_NXTHDR(&msg, cmsgh)) {
@@ -639,26 +622,20 @@
i++;
}
- retinfo->flags = msg.msg_flags;
- retinfo->error_code = 0;
+ retinfo->retflag = msg.msg_flags;
- //address = (struct sockaddr*) malloc (sizeof(struct sockaddr));
- memcpy(address,retinfo->address,sizeof(struct sockaddr));
+ // Set the parameters of address
+ memcpy(address,retinfo->address,retinfo->addrlen);
+ *addrlen = retinfo->addrlen;
-
- *addrlen = retinfo->addrlen;
+ // Set the parameters of message
*no_of_messages = retinfo->no_of_messages;
*size_of_ancillary = retinfo->size_of_ancillary;
-
*length_of_messages = (int*) malloc (sizeof(int) *
retinfo->no_of_messages);
- //*length_of_messages =
memcpy(*length_of_messages, retinfo->length_of_messages,
sizeof(int) * retinfo->no_of_messages);
-
int counter = 0;
for (i=0; i< retinfo->no_of_messages; i++)
counter += retinfo->length_of_messages[i];
-
- //*messages = (char*) malloc(sizeof(char) * counter);
memset(*messages, 0, sizeof(char) * counter);
counter = 0;
for(i=0; i< retinfo->no_of_messages; i++){
@@ -666,20 +643,18 @@
counter += retinfo->length_of_messages[i];
}
- *levels = (int*) malloc (sizeof(int) * retinfo->size_of_ancillary);
- //*levels =
- memcpy(*levels, retinfo->levels, sizeof(int) *
retinfo->size_of_ancillary);
- *types = (int*) malloc (sizeof(int) * retinfo->size_of_ancillary);
- //*types =
- memcpy(*types, retinfo->types, sizeof(int) *
retinfo->size_of_ancillary);
- *descr_per_ancillary = (int*) malloc (sizeof(int) *
retinfo->size_of_ancillary);
- //*descr_per_ancillary =
- memcpy(*descr_per_ancillary, retinfo->descr_per_ancillary,
sizeof(int) * retinfo->size_of_ancillary);
-
+ // Set the parameters of ancillary
+ *levels = (long*) malloc (sizeof(long) *
retinfo->size_of_ancillary);
+ *types = (long*) malloc (sizeof(long) *
retinfo->size_of_ancillary);
+ *descr_per_ancillary = (long*) malloc (sizeof(long) *
retinfo->size_of_ancillary);
counter = 0;
- for (i=0; i < retinfo->size_of_ancillary; i++)
+ for (i=0; i < retinfo->size_of_ancillary; i++){
counter += retinfo->descr_per_ancillary[i];
-
+ // Convert the int* to long*
+ levels[0][i] = (long) retinfo->levels[i];
+ types[0][i] = (long) retinfo->types[i];
+ descr_per_ancillary[0][i] = (long)
retinfo->descr_per_ancillary[i];
+ }
*file_descr = (char*) malloc (sizeof(char) * counter);
memset(*file_descr, 0, sizeof(char) * counter);
counter = 0;
@@ -688,13 +663,10 @@
counter += retinfo->descr_per_ancillary[i];
}
- *flag = retinfo->flags;
- //int k;
- //char* dsadas;
- //dsadas = (char*) (*file_descr[0]);
- //for (k=0; k<retinfo->no_of_messages * sizeof(int); k++)
- // printf("0x%X ", dsadas[k]);
+ // Set the retflag
+ *retflag = retinfo->retflag;
+ // Free the memory
free(retinfo->address);
free(retinfo->length_of_messages);
free(retinfo->levels);
@@ -721,17 +693,8 @@
free(retinfo->messages[0]);
free(retinfo->messages);
free(retinfo->address);
+ free(retinfo);
free(controlbuf);
- file_descr = NULL;
- levels = NULL;
- types = NULL;
- descr_per_ancillary = NULL;
- length_of_messages = NULL;
- messages =NULL;
- address = NULL;
- addrlen = NULL;
- no_of_messages = NULL;
- size_of_ancillary = NULL;
}else{
if (iov_alloc){
@@ -740,22 +703,14 @@
free(retinfo->messages);
free(retinfo->address);
free(controlbuf);
- length_of_messages = NULL;
- messages =NULL;
- address = NULL;
- file_descr = NULL;
- levels = NULL;
- types = NULL;
- descr_per_ancillary = NULL;
- addrlen = NULL;
- no_of_messages = NULL;
- size_of_ancillary = NULL;
-
+ free(retinfo);
}
}
return error_flag;
err_closefds:
+ // Special case for UNIX sockets. In case file descriptors are
received, they need to be closed.
+ // Taken from CPython
#ifdef SCM_RIGHTS
/* Close all descriptors coming from SCM_RIGHTS, so they don't
leak. */
for (cmsgh = ((msg.msg_controllen > 0) ? CMSG_FIRSTHDR(&msg) :
NULL);
@@ -783,8 +738,8 @@
}
-
//################################################################################################
- //send goes from here
+ //
################################################################################################
+ // Sendmsg implementation and associated functions
#ifdef CMSG_LEN
static int
@@ -822,8 +777,28 @@
}
#endif
+ /*
+ sendmsg_implementation is a wrapper over sendmsg of the API.
+ It was inspired from the way CPython did their implementation of
this.
+ The main reason that it was written in C, is the struct msghdr,
+ which contains the ancillary data in a linked list of cmsghdr
structures.
+ It was simpler to use it in C, and then push the simpler types of
data via rffi.
+ */
RPY_EXTERN
- int sendmsg_implementation(int socket, struct sockaddr* address,
socklen_t addrlen, long* length_of_messages, char** messages, int
no_of_messages, long* levels, long* types, char** file_descriptors, long*
no_of_fds, int control_length, int flag )
+ int sendmsg_implementation
+ (int socket,
+ struct sockaddr* address,
+ socklen_t addrlen,
+ long* length_of_messages,
+ char** messages,
+ int no_of_messages,
+ long* levels,
+ long* types,
+ char** file_descriptors,
+ long* no_of_fds,
+ int control_length,
+ int flag
+ )
{
struct msghdr msg = {0};
@@ -832,15 +807,16 @@
int retval;
size_t i;
+ // Prepare the msghdr structure for the send:
+
// Add the address
-
if (address != NULL) {
msg.msg_name = address;
msg.msg_namelen = addrlen;
}
+
// Add the message
struct iovec *iovs = NULL;
-
if (no_of_messages > 0){
iovs = (struct iovec*) malloc(no_of_messages * sizeof(struct
iovec));
@@ -853,8 +829,8 @@
iovs[i].iov_len = length_of_messages[i];
}
}
+
// Add the ancillary
-
#ifndef CMSG_SPACE
if (control_length > 1){
free(iovs);
@@ -862,7 +838,9 @@
}
#endif
if (control_length > 0){
+
//compute the total size of the ancillary
+ //getting the exact amount of space can be tricky and os
dependent.
size_t total_size_of_ancillary = 0;
size_t space;
size_t controllen = 0, controllen_last = 0;
@@ -884,16 +862,14 @@
return ANC_DATA_TOO_LARGEX;
}
controllen_last = controllen;
-
}
- controlbuf = malloc(controllen); //* sizeof(int)
-
+ controlbuf = malloc(controllen);
msg.msg_control= controlbuf;
msg.msg_controllen = controllen;
+ // memset controlbuf to 0 to avoid trash in the ancillary
memset(controlbuf, 0, controllen);
-
cmsg = NULL;
for (i = 0; i< control_length; i++){
cmsg = (i == 0) ? CMSG_FIRSTHDR(&msg) : CMSG_NXTHDR(&msg,
cmsg);
@@ -912,6 +888,7 @@
// Send the data
retval = sendmsg(socket, &msg, flag);
+ // free everything that was allocated here, and we would not need
in rsocket
if (iovs != NULL)
free(iovs);
if (controlbuf !=NULL)
@@ -919,6 +896,14 @@
return retval;
}
+
+ //
################################################################################################
+ // Wrappers for CMSG_SPACE and CMSG_LEN
+
+ /*
+ These 2 functions are wrappers over sys/socket.h's CMSG_SPACE and
CMSG_LEN.
+ They are identical to CPython's.
+ */
#ifdef CMSG_SPACE
RPY_EXTERN
size_t CMSG_SPACE_wrapper(size_t desired_space){
@@ -931,7 +916,6 @@
#endif
#ifdef CMSG_LEN
-
RPY_EXTERN
size_t CMSG_LEN_wrapper(size_t desired_len){
size_t result;
@@ -942,14 +926,22 @@
}
#endif
+ //
################################################################################################
+ // Extra functions that I needed
+
+ /*
+ This function is used to memcpy from a char* at an offset.
+ Could not get rffi.c_memcpy to do it at an offset, so I made my own.
+ */
RPY_EXTERN
- char* memcpy_from_CCHARP_at_offset_and_size(char* string, int offset,
int size){
- char* buffer;
- buffer = (char*)malloc(sizeof(char)*size);
- buffer = memcpy(buffer, string + offset, size);
- return buffer;
+ int memcpy_from_CCHARP_at_offset_and_size(char* stringfrom, char**
stringto, int offset, int size){
+ *stringto = memcpy(*stringto, stringfrom + offset, size);
+ return 0;
}
+ /*
+ These functions free memory that was allocated in C (sendmsg or
recvmsg) was used in rsocket and now needs cleanup
+ */
RPY_EXTERN
int free_pointer_to_signedp(int** ptrtofree){
free(*ptrtofree);
@@ -967,7 +959,7 @@
post_include_bits =[ "RPY_EXTERN "
"int sendmsg_implementation(int socket, struct
sockaddr* address, socklen_t addrlen, long* length_of_messages, char**
messages, int no_of_messages, long* levels, long* types, char**
file_descriptors, long* no_of_fds, int control_length, int flag );\n"
"RPY_EXTERN "
- "int recvmsg_implementation(int socket_fd, int
message_size, int ancillary_size, int flags, struct sockaddr* address,
socklen_t* addrlen, int** length_of_messages, char** messages, int*
no_of_messages, int* size_of_ancillary, int** levels, int** types, char**
file_descr, int** descr_per_ancillary, int* flag);\n"
+ "int recvmsg_implementation(int socket_fd, int
message_size, int ancillary_size, int flags, struct sockaddr* address,
socklen_t* addrlen, int** length_of_messages, char** messages, int*
no_of_messages, int* size_of_ancillary, long** levels, long** types, char**
file_descr, long** descr_per_ancillary, int* flag);\n"
"static "
"int cmsg_min_space(struct msghdr *msg, struct
cmsghdr *cmsgh, size_t space);\n"
"static "
@@ -983,32 +975,13 @@
"RPY_EXTERN "
"size_t CMSG_SPACE_wrapper(size_t desired_space);\n"
"RPY_EXTERN "
- "char* memcpy_from_CCHARP_at_offset_and_size(char*
string, int offset, int size);\n"
+ "int memcpy_from_CCHARP_at_offset_and_size(char*
stringfrom, char** stringto, int offset, int size);\n"
"RPY_EXTERN "
"int free_pointer_to_signedp(int** ptrtofree);\n"
"RPY_EXTERN "
"int free_ptr_to_charp(char** ptrtofree);\n"
]
- #CConfig.SignedPP = lltype.Ptr(lltype.Array(rffi.SIGNEDP,
hints={'nolength': True}))
-
-
- # CConfig.recvmsginfo = platform.Struct('struct recvmsg_info',
- # [('error_code',rffi.SIGNED),
- # ('address',sockaddr_ptr),
- # ('addrlen',socklen_t_ptr),
- # ('length_of_messages',
rffi.SIGNEDP),
- # ('messages',rffi.CCHARPP),
- # ('no_of_messages',rffi.INT),
- # ('size_of_ancillary',rffi.INT),
- # ('levels', rffi.SIGNEDP),
- # ('types', rffi.SIGNEDP),
- # ('file_descr', rffi.CCHARPP),
- # ('descr_per_ancillary',
rffi.SIGNEDP),
- # ('flags', rffi.INT),
- # ])
-
- #
compilation_info = ExternalCompilationInfo(
includes=includes,
@@ -1252,7 +1225,7 @@
compilation_info=compilation_info))
memcpy_from_CCHARP_at_offset =
jit.dont_look_inside(rffi.llexternal("memcpy_from_CCHARP_at_offset_and_size",
-
[rffi.CCHARP,rffi.INT,rffi.INT],rffi.CCHARP,save_err=SAVE_ERR,compilation_info=compilation_info))
+ [rffi.CCHARP,
rffi.CCHARPP,rffi.INT,rffi.INT],rffi.INT,save_err=SAVE_ERR,compilation_info=compilation_info))
freeccharp = jit.dont_look_inside(rffi.llexternal("free_ptr_to_charp",
[rffi.CCHARPP],rffi.INT,save_err=SAVE_ERR,compilation_info=compilation_info))
freesignedp = jit.dont_look_inside(rffi.llexternal("free_pointer_to_signedp",
diff --git a/rpython/rlib/rsocket.py b/rpython/rlib/rsocket.py
--- a/rpython/rlib/rsocket.py
+++ b/rpython/rlib/rsocket.py
@@ -932,6 +932,7 @@
address.addrlen = addrlen
else:
address = None
+ print address
data = buf.str(read_bytes)
return (data, address)
raise self.error_handler()
@@ -965,14 +966,20 @@
@jit.dont_look_inside
def recvmsg(self, message_size, ancbufsize = 0, flags = 0):
+ """
+ Receive up to message_size bytes from a message. Also receives
ancillary data.
+ Returns the message, ancillary, flag and address of the sender.
+ :param message_size: Maximum size of the message to be received
+ :param ancbufsize: Maximum size of the ancillary data to be received
+ :param flags: Receive flag. For more details, please check the Unix
manual
+ :return: a tuple consisting of the message, the ancillary data, return
flag and the address.
+ """
if message_size < 0:
raise RSocketError("Invalid message size")
if ancbufsize < 0:
raise RSocketError("invalid ancillary data buffer length")
- # addr, maxlen = make_null_address(self.family)
- # addrlen_p = lltype.malloc(_c.socklen_t_ptr.TO, flavor='raw')
- # addrlen_p[0] = rffi.cast(_c.socklen_t, maxlen)
+ self.wait_for_data(False)
address, addr_p, addrlen_p = self._addrbuf()
len_of_msgs =
lltype.malloc(rffi.SIGNEDPP.TO,1,flavor='raw',track_allocation=True,nonmovable=False)
messages =
lltype.malloc(rffi.CCHARPP.TO,1,flavor='raw',track_allocation=True,nonmovable=False
)
@@ -989,6 +996,7 @@
retflag =
lltype.malloc(rffi.SIGNEDP.TO,1,flavor='raw',track_allocation=True,nonmovable=False
)
retflag[0] = rffi.cast(rffi.SIGNED,0)
+ # a mask for the SIGNEDP's that need to be cast to int. (long default)
LONG_MASK = 2**32 - 1
reply = _c.recvmsg(self.fd, rffi.cast(lltype.Signed,message_size),
rffi.cast(lltype.Signed,ancbufsize),rffi.cast(lltype.Signed,flags),
@@ -1008,55 +1016,74 @@
offset = 0
list_of_tuples = []
+
+ pre_anc = lltype.malloc(rffi.CCHARPP.TO, 1, flavor='raw',
track_allocation=True, nonmovable=False)
for i in range(anc_size):
- x = rffi.cast(rffi.SIGNED, levels[0][i])
- x &= LONG_MASK
- level = x
- x = rffi.cast(rffi.SIGNED,types[0][i])
- x &= LONG_MASK
- type = x
- x = rffi.cast(rffi.SIGNED,descr_per_anc[0][i])
- x &= LONG_MASK
- bytes_in_anc = x
- pre_anc =
_c.memcpy_from_CCHARP_at_offset(file_descr[0],rffi.cast(rffi.SIGNED,offset),
bytes_in_anc)
- anc = rffi.charpsize2str(pre_anc,bytes_in_anc)
+ level = rffi.cast(rffi.SIGNED, levels[0][i])
+ type = rffi.cast(rffi.SIGNED, types[0][i])
+ bytes_in_anc = rffi.cast(rffi.SIGNED, descr_per_anc[0][i])
+ pre_anc[0] = lltype.malloc(rffi.CCHARP.TO,
bytes_in_anc,flavor='raw',track_allocation=True,nonmovable=False)
+ _c.memcpy_from_CCHARP_at_offset(file_descr[0],
pre_anc,rffi.cast(rffi.SIGNED,offset), bytes_in_anc)
+ anc = rffi.charpsize2str(pre_anc[0],bytes_in_anc)
tup = (level,type, anc)
list_of_tuples.append(tup)
offset += bytes_in_anc
- #lltype.free(pre_anc, flavor='raw')
- #address.unlock()
+ lltype.free(pre_anc[0], flavor='raw')
+
if addrlen:
address.addrlen = addrlen
else:
+ address.unlock()
address = None
-
rettup = (retmsg,list_of_tuples,returnflag,address)
- #free underlying complexity first
if address is not None:
address.unlock()
- # lltype.free(messages[0],flavor='raw')
+ # free underlying complexity first
_c.freeccharp(file_descr)
_c.freesignedp(len_of_msgs)
_c.freesignedp(levels)
_c.freesignedp(types)
_c.freesignedp(descr_per_anc)
+ lltype.free(messages[0], flavor='raw')
+ lltype.free(pre_anc,flavor='raw')
lltype.free(messages,flavor='raw')
lltype.free(file_descr,flavor='raw')
lltype.free(len_of_msgs,flavor='raw')
lltype.free(no_of_messages, flavor='raw')
lltype.free(size_of_anc, flavor='raw')
lltype.free(levels, flavor='raw')
+ lltype.free(types, flavor='raw')
lltype.free(descr_per_anc, flavor='raw')
lltype.free(retflag, flavor='raw')
lltype.free(addrlen_p,flavor='raw')
return rettup
else:
+
+ #in case of failure the underlying complexity has already been
freed
+ lltype.free(messages[0], flavor='raw')
+ lltype.free(messages, flavor='raw')
+ lltype.free(file_descr, flavor='raw')
+ lltype.free(len_of_msgs, flavor='raw')
+ lltype.free(no_of_messages, flavor='raw')
+ lltype.free(size_of_anc, flavor='raw')
+ lltype.free(levels, flavor='raw')
+ lltype.free(types, flavor='raw')
+ lltype.free(descr_per_anc, flavor='raw')
+ lltype.free(retflag, flavor='raw')
+ lltype.free(addrlen_p, flavor='raw')
+
if address is not None:
address.unlock()
+ if (reply == -10000):
+ raise RSocketError("Invalid message size")
+ if (reply == -10001):
+ raise RSocketError("Invalid ancillary data buffer length")
+ if (reply == -10002):
+ raise RSocketError("received malformed or improperly truncated
ancillary data")
raise last_error()
@@ -1109,8 +1136,16 @@
@jit.dont_look_inside
def sendmsg(self, messages, ancillary=None, flags=0, address=None):
- # addr = address.lock()
- # addrlen = address.addrlen
+ """
+ Send data and ancillary on a socket. For use of ancillary data, please
check the Unix manual.
+ Work on connectionless sockets via the address parameter.
+ :param messages: a message that is a list of strings
+ :param ancillary: data to be sent separate from the message body.
Needs to be a list of tuples.
+ E.g. [(level,type, bytes),...]. Default None.
+ :param flags: the flag to be set for sendmsg. Please check the Unix
manual regarding values. Default 0
+ :param address: address of the recepient. Useful for when sending on
connectionless sockets. Default None
+ :return: Bytes sent from the message
+ """
need_to_free_address = True
if address is None:
need_to_free_address = False
@@ -1127,7 +1162,6 @@
for message in messages:
messages_ptr[counter] = rffi.str2charp(message)
messages_length_ptr[counter] = rffi.cast(rffi.SIGNED, len(message))
- #messages_length_ptr[counter] = rffi.cast(rffi.SIGNED,
0x00cabc00abcabc00)
counter += 1
messages_ptr[counter] = lltype.nullptr(rffi.CCHARP.TO)
if ancillary is not None:
@@ -1146,13 +1180,14 @@
levels[counter] = rffi.cast(rffi.SIGNED,level)
types[counter] = rffi.cast(rffi.SIGNED,type)
desc_per_ancillary[counter] = rffi.cast(rffi.SIGNED,
(len(content)))
- #file_descr[counter] =
lltype.malloc(rffi.CCHARP.TO,len(content),flavor='raw',zero=True,
track_allocation=True,nonmovable=False)
file_descr[counter] = rffi.str2charp(content,
track_allocation=True)
counter +=1
else:
size_of_ancillary = 0
snd_no_msgs = rffi.cast(rffi.SIGNED, no_of_messages)
snd_anc_size =rffi.cast(rffi.SIGNED, size_of_ancillary)
+
+
bytes_sent = _c.sendmsg(self.fd, addr, addrlen, messages_length_ptr,
messages_ptr,
snd_no_msgs,levels,types,file_descr,desc_per_ancillary,snd_anc_size,flags)
@@ -1171,6 +1206,7 @@
lltype.free(levels, flavor='raw', track_allocation=True)
lltype.free(file_descr, flavor='raw', track_allocation=True)
+ self.wait_for_data(True)
if (bytes_sent < 0) and (bytes_sent!=-1000) and (bytes_sent!=-1001)
and (bytes_sent!=-1002):
raise last_error()
@@ -1361,12 +1397,24 @@
if _c._POSIX:
def CMSG_LEN( demanded_len):
+ """
+ Socket method to determine the optimal byte size of the ancillary.
+ Recommended to be used when computing the ancillary size for recvmsg.
+ :param demanded_len: an integer with the minimum size required.
+ :return: an integer with the minimum memory needed for the required
size. The value is not memory alligned
+ """
if demanded_len < 0:
return 0
result = _c.CMSG_LEN(demanded_len)
return result
def CMSG_SPACE( demanded_size):
+ """
+ Socket method to determine the optimal byte size of the ancillary.
+ Recommended to be used when computing the ancillary size for recvmsg.
+ :param demanded_size: an integer with the minimum size required.
+ :return: an integer with the minimum memory needed for the required
size. The value is memory alligned
+ """
if demanded_size < 0:
return 0
result = _c.CMSG_SPACE(demanded_size)
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit